sui_core/consensus_manager/
mod.rs1use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
4use crate::consensus_adapter::{BlockStatusReceiver, ConsensusClient};
5use crate::consensus_handler::{
6 ConsensusBlockHandler, ConsensusHandlerInitializer, MysticetiConsensusHandler,
7};
8use crate::consensus_validator::SuiTxValidator;
9use crate::mysticeti_adapter::LazyMysticetiClient;
10use arc_swap::ArcSwapOption;
11use async_trait::async_trait;
12use consensus_config::{Committee, NetworkKeyPair, Parameters, ProtocolKeyPair};
13use consensus_core::{
14 Clock, CommitConsumerArgs, CommitConsumerMonitor, CommitIndex, ConsensusAuthority,
15};
16use core::panic;
17use fastcrypto::traits::KeyPair as _;
18use mysten_metrics::{RegistryID, RegistryService};
19use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
20use std::path::PathBuf;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23use sui_config::{ConsensusConfig, NodeConfig};
24use sui_protocol_config::{ConsensusNetwork, ProtocolVersion};
25use sui_types::error::SuiResult;
26use sui_types::messages_consensus::{ConsensusPosition, ConsensusTransaction};
27use sui_types::{
28 committee::EpochId, sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
29};
30use tokio::sync::{Mutex, broadcast};
31use tokio::time::{sleep, timeout};
32use tracing::{error, info};
33
34#[cfg(test)]
35#[path = "../unit_tests/consensus_manager_tests.rs"]
36pub mod consensus_manager_tests;
37
38#[derive(PartialEq)]
39enum Running {
40 True(EpochId, ProtocolVersion),
41 False,
42}
43
44pub struct ConsensusManager {
46 consensus_config: ConsensusConfig,
47 protocol_keypair: ProtocolKeyPair,
48 network_keypair: NetworkKeyPair,
49 storage_base_path: PathBuf,
50 metrics: Arc<ConsensusManagerMetrics>,
51 registry_service: RegistryService,
52 authority: ArcSwapOption<(ConsensusAuthority, RegistryID)>,
53
54 client: Arc<LazyMysticetiClient>,
57 consensus_client: Arc<UpdatableConsensusClient>,
58
59 consensus_handler: Mutex<Option<MysticetiConsensusHandler>>,
60
61 #[cfg(test)]
62 pub(crate) consumer_monitor: ArcSwapOption<CommitConsumerMonitor>,
63 #[cfg(not(test))]
64 consumer_monitor: ArcSwapOption<CommitConsumerMonitor>,
65 consumer_monitor_sender: broadcast::Sender<Arc<CommitConsumerMonitor>>,
66
67 running: Mutex<Running>,
68
69 #[cfg(test)]
70 pub(crate) boot_counter: Mutex<u64>,
71 #[cfg(not(test))]
72 boot_counter: Mutex<u64>,
73}
74
75impl ConsensusManager {
76 pub fn new(
77 node_config: &NodeConfig,
78 consensus_config: &ConsensusConfig,
79 registry_service: &RegistryService,
80 consensus_client: Arc<UpdatableConsensusClient>,
81 ) -> Self {
82 let metrics = Arc::new(ConsensusManagerMetrics::new(
83 ®istry_service.default_registry(),
84 ));
85 let client = Arc::new(LazyMysticetiClient::new());
86 let (consumer_monitor_sender, _) = broadcast::channel(1);
87 Self {
88 consensus_config: consensus_config.clone(),
89 protocol_keypair: ProtocolKeyPair::new(node_config.worker_key_pair().copy()),
90 network_keypair: NetworkKeyPair::new(node_config.network_key_pair().copy()),
91 storage_base_path: consensus_config.db_path().to_path_buf(),
92 metrics,
93 registry_service: registry_service.clone(),
94 authority: ArcSwapOption::empty(),
95 client,
96 consensus_client,
97 consensus_handler: Mutex::new(None),
98 consumer_monitor: ArcSwapOption::empty(),
99 consumer_monitor_sender,
100 running: Mutex::new(Running::False),
101 boot_counter: Mutex::new(0),
102 }
103 }
104
105 pub async fn start(
106 &self,
107 node_config: &NodeConfig,
108 epoch_store: Arc<AuthorityPerEpochStore>,
109 consensus_handler_initializer: ConsensusHandlerInitializer,
110 tx_validator: SuiTxValidator,
111 ) {
112 let system_state = epoch_store.epoch_start_state();
113 let committee: Committee = system_state.get_consensus_committee();
114 let epoch = epoch_store.epoch();
115 let protocol_config = epoch_store.protocol_config();
116 let network_type = self.pick_network(&epoch_store);
117
118 let start_time = Instant::now();
120 let mut running = self.running.lock().await;
121 if let Running::True(running_epoch, running_version) = *running {
122 error!(
123 "Consensus is already Running for epoch {running_epoch:?} & protocol version {running_version:?} - shutdown first before starting",
124 );
125 return;
126 }
127 *running = Running::True(epoch, protocol_config.version);
128
129 info!(
130 "Starting up consensus for epoch {epoch:?} & protocol version {:?}",
131 protocol_config.version
132 );
133
134 self.consensus_client.set(self.client.clone());
135
136 let consensus_config = node_config
137 .consensus_config()
138 .expect("consensus_config should exist");
139
140 let parameters = Parameters {
141 db_path: self.get_store_path(epoch),
142 ..consensus_config.parameters.clone().unwrap_or_default()
143 };
144
145 let own_protocol_key = self.protocol_keypair.public();
146 let (own_index, _) = committee
147 .authorities()
148 .find(|(_, a)| a.protocol_key == own_protocol_key)
149 .expect("Own authority should be among the consensus authorities!");
150
151 let registry = Registry::new_custom(Some("consensus".to_string()), None).unwrap();
152
153 let consensus_handler = consensus_handler_initializer.new_consensus_handler();
154
155 let num_prior_commits = protocol_config.consensus_num_requested_prior_commits_at_startup();
156 let last_processed_commit_index =
157 consensus_handler.last_processed_subdag_index() as CommitIndex;
158 let replay_after_commit_index =
159 last_processed_commit_index.saturating_sub(num_prior_commits);
160
161 let (commit_consumer, commit_receiver, block_receiver) =
162 CommitConsumerArgs::new(replay_after_commit_index, last_processed_commit_index);
163 let monitor = commit_consumer.monitor();
164
165 let consensus_block_handler = ConsensusBlockHandler::new(
167 epoch_store.clone(),
168 consensus_handler.execution_scheduler_sender().clone(),
169 consensus_handler_initializer.backpressure_subscriber(),
170 consensus_handler_initializer.metrics().clone(),
171 );
172 let handler = MysticetiConsensusHandler::new(
173 last_processed_commit_index,
174 consensus_handler,
175 consensus_block_handler,
176 commit_receiver,
177 block_receiver,
178 monitor.clone(),
179 );
180 let mut consensus_handler = self.consensus_handler.lock().await;
181 *consensus_handler = Some(handler);
182
183 let participated_on_previous_run =
187 if let Some(previous_monitor) = self.consumer_monitor.swap(Some(monitor.clone())) {
188 previous_monitor.highest_handled_commit() > 0
189 } else {
190 false
191 };
192
193 let mut boot_counter = self.boot_counter.lock().await;
198 if participated_on_previous_run {
199 *boot_counter += 1;
200 } else {
201 info!(
202 "Node has not participated in previous epoch consensus. Boot counter ({}) will not increment.",
203 *boot_counter
204 );
205 }
206
207 let authority = ConsensusAuthority::start(
208 network_type,
209 epoch_store.epoch_start_config().epoch_start_timestamp_ms(),
210 own_index,
211 committee.clone(),
212 parameters.clone(),
213 protocol_config.clone(),
214 self.protocol_keypair.clone(),
215 self.network_keypair.clone(),
216 Arc::new(Clock::default()),
217 Arc::new(tx_validator.clone()),
218 commit_consumer,
219 registry.clone(),
220 *boot_counter,
221 )
222 .await;
223 let client = authority.transaction_client();
224
225 let registry_id = self.registry_service.add(registry.clone());
226
227 let registered_authority = Arc::new((authority, registry_id));
228 self.authority.swap(Some(registered_authority.clone()));
229
230 self.client.set(client);
232
233 let _ = self.consumer_monitor_sender.send(monitor);
235
236 let elapsed = start_time.elapsed().as_secs_f64();
237 self.metrics.start_latency.set(elapsed as i64);
238
239 tracing::info!(
240 "Started consensus for epoch {} & protocol version {:?} completed - took {} seconds",
241 epoch,
242 protocol_config.version,
243 elapsed
244 );
245 }
246
247 pub async fn shutdown(&self) {
248 info!("Shutting down consensus ...");
249
250 let start_time = Instant::now();
252 let mut running = self.running.lock().await;
253 let (shutdown_epoch, shutdown_version) = match *running {
254 Running::True(epoch, version) => {
255 tracing::info!(
256 "Shutting down consensus for epoch {epoch:?} & protocol version {version:?}"
257 );
258 *running = Running::False;
259 (epoch, version)
260 }
261 Running::False => {
262 error!("Consensus shutdown was called but consensus is not running");
263 return;
264 }
265 };
266
267 self.client.clear();
269
270 let r = self.authority.swap(None).unwrap();
272 let Ok((authority, registry_id)) = Arc::try_unwrap(r) else {
273 panic!("Failed to retrieve the Mysticeti authority");
274 };
275
276 authority.stop().await;
278
279 let mut consensus_handler = self.consensus_handler.lock().await;
281 if let Some(mut handler) = consensus_handler.take() {
282 handler.abort().await;
283 }
284
285 self.registry_service.remove(registry_id);
287
288 self.consensus_client.clear();
289
290 let elapsed = start_time.elapsed().as_secs_f64();
291 self.metrics.shutdown_latency.set(elapsed as i64);
292
293 tracing::info!(
294 "Consensus stopped for epoch {shutdown_epoch:?} & protocol version {shutdown_version:?} is complete - took {} seconds",
295 elapsed
296 );
297 }
298
299 pub async fn is_running(&self) -> bool {
300 let running = self.running.lock().await;
301 matches!(*running, Running::True(_, _))
302 }
303
304 pub fn replay_waiter(&self) -> ReplayWaiter {
305 let consumer_monitor_receiver = self.consumer_monitor_sender.subscribe();
306 ReplayWaiter::new(consumer_monitor_receiver)
307 }
308
309 pub fn get_storage_base_path(&self) -> PathBuf {
310 self.consensus_config.db_path().to_path_buf()
311 }
312
313 fn get_store_path(&self, epoch: EpochId) -> PathBuf {
314 let mut store_path = self.storage_base_path.clone();
315 store_path.push(format!("{}", epoch));
316 store_path
317 }
318
319 fn pick_network(&self, epoch_store: &AuthorityPerEpochStore) -> ConsensusNetwork {
320 if let Ok(type_str) = std::env::var("CONSENSUS_NETWORK") {
321 match type_str.to_lowercase().as_str() {
322 "anemo" => return ConsensusNetwork::Anemo,
323 "tonic" => return ConsensusNetwork::Tonic,
324 _ => {
325 info!(
326 "Invalid consensus network type {} in env var. Continue to use the value from protocol config.",
327 type_str
328 );
329 }
330 }
331 }
332 epoch_store.protocol_config().consensus_network()
333 }
334}
335
336#[derive(Default)]
339pub struct UpdatableConsensusClient {
340 client: ArcSwapOption<Arc<dyn ConsensusClient>>,
342}
343
344impl UpdatableConsensusClient {
345 pub fn new() -> Self {
346 Self {
347 client: ArcSwapOption::empty(),
348 }
349 }
350
351 async fn get(&self) -> Arc<Arc<dyn ConsensusClient>> {
352 const START_TIMEOUT: Duration = Duration::from_secs(300);
353 const RETRY_INTERVAL: Duration = Duration::from_millis(100);
354 if let Ok(client) = timeout(START_TIMEOUT, async {
355 loop {
356 let Some(client) = self.client.load_full() else {
357 sleep(RETRY_INTERVAL).await;
358 continue;
359 };
360 return client;
361 }
362 })
363 .await
364 {
365 return client;
366 }
367
368 panic!(
369 "Timed out after {:?} waiting for Consensus to start!",
370 START_TIMEOUT,
371 );
372 }
373
374 pub fn set(&self, client: Arc<dyn ConsensusClient>) {
375 self.client.store(Some(Arc::new(client)));
376 }
377
378 pub fn clear(&self) {
379 self.client.store(None);
380 }
381}
382
383#[async_trait]
384impl ConsensusClient for UpdatableConsensusClient {
385 async fn submit(
386 &self,
387 transactions: &[ConsensusTransaction],
388 epoch_store: &Arc<AuthorityPerEpochStore>,
389 ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
390 let client = self.get().await;
391 client.submit(transactions, epoch_store).await
392 }
393}
394
395pub struct ReplayWaiter {
397 consumer_monitor_receiver: broadcast::Receiver<Arc<CommitConsumerMonitor>>,
398}
399
400impl ReplayWaiter {
401 pub(crate) fn new(
402 consumer_monitor_receiver: broadcast::Receiver<Arc<CommitConsumerMonitor>>,
403 ) -> Self {
404 Self {
405 consumer_monitor_receiver,
406 }
407 }
408
409 pub(crate) async fn wait_for_replay(mut self) {
410 loop {
411 info!("Waiting for consensus to start replaying ...");
412 let Ok(monitor) = self.consumer_monitor_receiver.recv().await else {
413 continue;
414 };
415 info!("Waiting for consensus handler to finish replaying ...");
416 monitor
417 .replay_to_consumer_last_processed_commit_complete()
418 .await;
419 break;
420 }
421 }
422}
423
424impl Clone for ReplayWaiter {
425 fn clone(&self) -> Self {
426 Self {
427 consumer_monitor_receiver: self.consumer_monitor_receiver.resubscribe(),
428 }
429 }
430}
431
432pub struct ConsensusManagerMetrics {
433 start_latency: IntGauge,
434 shutdown_latency: IntGauge,
435}
436
437impl ConsensusManagerMetrics {
438 pub fn new(registry: &Registry) -> Self {
439 Self {
440 start_latency: register_int_gauge_with_registry!(
441 "consensus_manager_start_latency",
442 "The latency of starting up consensus nodes",
443 registry,
444 )
445 .unwrap(),
446 shutdown_latency: register_int_gauge_with_registry!(
447 "consensus_manager_shutdown_latency",
448 "The latency of shutting down consensus nodes",
449 registry,
450 )
451 .unwrap(),
452 }
453 }
454}