consensus_core/
authority_node.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{sync::Arc, time::Instant};
5
6use consensus_config::{
7    Committee, ConsensusProtocolConfig, NetworkKeyPair, NetworkPublicKey, Parameters,
8    ProtocolKeyPair,
9};
10use consensus_types::block::Round;
11use itertools::Itertools;
12use mysten_network::Multiaddr;
13use parking_lot::RwLock;
14use prometheus::Registry;
15use tracing::{info, warn};
16
17use crate::{
18    BlockAPI as _, CommitConsumerArgs, RandomnessSignatureHandler,
19    authority_service::AuthorityService,
20    block_manager::BlockManager,
21    block_sync_service::BlockSyncService,
22    block_verifier::SignedBlockVerifier,
23    commit_observer::CommitObserver,
24    commit_syncer::{CommitSyncer, CommitSyncerHandle},
25    commit_vote_monitor::CommitVoteMonitor,
26    context::{Clock, Context},
27    core::{Core, CoreSignals},
28    core_thread::{ChannelCoreThreadDispatcher, CoreThreadHandle},
29    dag_state::DagState,
30    leader_schedule::LeaderSchedule,
31    leader_timeout::{LeaderTimeoutTask, LeaderTimeoutTaskHandle},
32    metrics::initialise_metrics,
33    network::{
34        CommitSyncerClient, NetworkManager, PeerId, SynchronizerClient, tonic_network::TonicManager,
35    },
36    observer_service::ObserverService,
37    observer_subscriber::ObserverSubscriber,
38    peers_pool::PeersPool,
39    round_prober::{RoundProber, RoundProberHandle},
40    round_tracker::RoundTracker,
41    storage::rocksdb_store::RocksDBStore,
42    subscriber::Subscriber,
43    synchronizer::{Synchronizer, SynchronizerHandle},
44    transaction::{TransactionClient, TransactionConsumer, TransactionVerifier},
45    transaction_vote_tracker::TransactionVoteTracker,
46};
47
48/// ConsensusAuthority is used by Sui to manage the lifetime of AuthorityNode.
49/// It hides the details of the implementation from the caller, MysticetiManager.
50#[allow(private_interfaces)]
51pub enum ConsensusAuthority {
52    WithTonic(AuthorityNode<TonicManager>),
53}
54
55impl ConsensusAuthority {
56    pub async fn start(
57        network_type: NetworkType,
58        epoch_start_timestamp_ms: u64,
59        committee: Committee,
60        parameters: Parameters,
61        protocol_config: ConsensusProtocolConfig,
62        // Only required for validator nodes. Observer nodes don't have a protocol keypair.
63        protocol_keypair: Option<ProtocolKeyPair>,
64        network_keypair: NetworkKeyPair,
65        clock: Arc<Clock>,
66        transaction_verifier: Arc<dyn TransactionVerifier>,
67        commit_consumer: CommitConsumerArgs,
68        registry: Registry,
69        // A counter that keeps track of how many times the consensus authority has been booted while the process
70        // has been running. It's useful for making decisions on whether amnesia recovery should run.
71        // When `boot_counter` is 0, `ConsensusAuthority` will initiate the process of amnesia recovery if that's enabled in the parameters.
72        boot_counter: u64,
73        randomness_signature_handler: Option<Arc<dyn RandomnessSignatureHandler>>,
74    ) -> Self {
75        match network_type {
76            NetworkType::Tonic => {
77                let authority = AuthorityNode::start(
78                    epoch_start_timestamp_ms,
79                    committee,
80                    parameters,
81                    protocol_config,
82                    protocol_keypair,
83                    network_keypair,
84                    clock,
85                    transaction_verifier,
86                    commit_consumer,
87                    registry,
88                    boot_counter,
89                    randomness_signature_handler,
90                )
91                .await;
92                Self::WithTonic(authority)
93            }
94        }
95    }
96
97    pub async fn stop(self) {
98        match self {
99            Self::WithTonic(authority) => authority.stop().await,
100        }
101    }
102
103    pub fn update_peer_address(
104        &self,
105        network_pubkey: NetworkPublicKey,
106        address: Option<Multiaddr>,
107    ) {
108        match self {
109            Self::WithTonic(authority) => authority.update_peer_address(network_pubkey, address),
110        }
111    }
112
113    pub fn transaction_client(&self) -> Arc<TransactionClient> {
114        match self {
115            Self::WithTonic(authority) => authority.transaction_client(),
116        }
117    }
118
119    pub fn store(&self) -> Arc<RocksDBStore> {
120        match self {
121            Self::WithTonic(authority) => authority.store(),
122        }
123    }
124
125    #[cfg(test)]
126    fn context(&self) -> &Arc<Context> {
127        match self {
128            Self::WithTonic(authority) => &authority.context,
129        }
130    }
131}
132
133#[derive(Clone, Copy, PartialEq, Eq, Debug)]
134pub enum NetworkType {
135    Tonic,
136}
137
138/// Enum to handle different subscriber types based on whether the node is a validator or observer
139enum SubscriberType<N: NetworkManager> {
140    Validator(Subscriber<N::ValidatorClient, AuthorityService<ChannelCoreThreadDispatcher>>),
141    Observer(ObserverSubscriber<N::ObserverClient, ObserverService>),
142}
143
144impl<N: NetworkManager> SubscriberType<N> {
145    fn stop(&self) {
146        match self {
147            SubscriberType::Validator(subscriber) => subscriber.stop(),
148            SubscriberType::Observer(subscriber) => subscriber.stop(),
149        }
150    }
151}
152
153pub(crate) struct AuthorityNode<N>
154where
155    N: NetworkManager,
156{
157    context: Arc<Context>,
158    start_time: Instant,
159    transaction_client: Arc<TransactionClient>,
160    synchronizer: Arc<SynchronizerHandle>,
161    store: Arc<RocksDBStore>,
162
163    commit_syncer_handle: CommitSyncerHandle,
164    round_prober_handle: Option<RoundProberHandle>,
165    leader_timeout_handle: LeaderTimeoutTaskHandle,
166    core_thread_handle: CoreThreadHandle,
167    subscriber: SubscriberType<N>,
168    network_manager: N,
169}
170
171impl<N> AuthorityNode<N>
172where
173    N: NetworkManager,
174{
175    // See comments above ConsensusAuthority::start() for details on the input.
176    pub(crate) async fn start(
177        epoch_start_timestamp_ms: u64,
178        committee: Committee,
179        parameters: Parameters,
180        protocol_config: ConsensusProtocolConfig,
181        protocol_keypair: Option<ProtocolKeyPair>,
182        network_keypair: NetworkKeyPair,
183        clock: Arc<Clock>,
184        transaction_verifier: Arc<dyn TransactionVerifier>,
185        commit_consumer: CommitConsumerArgs,
186        registry: Registry,
187        boot_counter: u64,
188        randomness_signature_handler: Option<Arc<dyn RandomnessSignatureHandler>>,
189    ) -> Self {
190        let metrics = initialise_metrics(registry);
191
192        // If a protocol key pair is provided, then this is a validator node.
193        let own_index = if let Some(protocol_keypair) = &protocol_keypair {
194            let (own_index, _) = committee
195                .authorities()
196                .find(|(_, a)| a.protocol_key == protocol_keypair.public())
197                .expect("Own authority should be among the consensus authorities!");
198
199            let own_hostname = committee.authority(own_index).hostname.clone();
200            info!(
201                "Starting consensus validator authority {} {}, {:?}, epoch start timestamp {}, boot counter {}, replaying after commit index {}, consumer last processed commit index {}",
202                own_index,
203                own_hostname,
204                protocol_config.protocol_version(),
205                epoch_start_timestamp_ms,
206                boot_counter,
207                commit_consumer.replay_after_commit_index,
208                commit_consumer.consumer_last_processed_commit_index
209            );
210
211            metrics
212                .node_metrics
213                .authority_index
214                .with_label_values(&[&own_hostname])
215                .set(own_index.value() as i64);
216            Some(own_index)
217        } else {
218            // Otherwise this is an observer node and no index exists for it.
219            info!(
220                "Starting consensus observer authority, {:?}, epoch start timestamp {}, boot counter {}, replaying after commit index {}, consumer last processed commit index {}",
221                protocol_config.protocol_version(),
222                epoch_start_timestamp_ms,
223                boot_counter,
224                commit_consumer.replay_after_commit_index,
225                commit_consumer.consumer_last_processed_commit_index
226            );
227            None
228        };
229
230        info!(
231            "Consensus authorities: {}",
232            committee
233                .authorities()
234                .map(|(i, a)| format!("{}: {}", i, a.hostname))
235                .join(", ")
236        );
237        info!("Consensus parameters: {:?}", parameters);
238        info!("Consensus committee: {:?}", committee);
239        let context = Arc::new(Context::new(
240            epoch_start_timestamp_ms,
241            own_index,
242            committee,
243            parameters,
244            protocol_config,
245            metrics,
246            clock,
247        ));
248        let start_time = Instant::now();
249
250        context
251            .metrics
252            .node_metrics
253            .protocol_version
254            .set(context.protocol_config.protocol_version() as i64);
255
256        let (tx_client, tx_receiver) = TransactionClient::new(context.clone());
257        let tx_consumer = TransactionConsumer::new(tx_receiver, context.clone());
258
259        let (core_signals, signals_receivers) = CoreSignals::new(context.clone());
260
261        let mut network_manager = N::new(context.clone(), network_keypair);
262        let validator_client = network_manager.validator_client();
263        let observer_client = network_manager.observer_client();
264
265        let synchronizer_client = Arc::new(SynchronizerClient::<
266            N::ValidatorClient,
267            N::ObserverClient,
268        >::new(
269            context.clone(),
270            Some(validator_client.clone()),
271            Some(observer_client.clone()),
272        ));
273        let commit_syncer_client = Arc::new(CommitSyncerClient::<
274            N::ValidatorClient,
275            N::ObserverClient,
276        >::new(
277            context.clone(),
278            Some(validator_client.clone()),
279            Some(observer_client.clone()),
280        ));
281
282        let store_path = context.parameters.db_path.as_path().to_str().unwrap();
283        let store = Arc::new(RocksDBStore::new(store_path));
284        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
285
286        let block_verifier = Arc::new(SignedBlockVerifier::new(
287            context.clone(),
288            transaction_verifier,
289        ));
290
291        let transaction_vote_tracker =
292            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
293
294        // Only sync last known own block if we are a validator and it's the first boot.
295        let sync_last_known_own_block = boot_counter == 0
296            && !context
297                .parameters
298                .sync_last_known_own_block_timeout
299                .is_zero()
300            && context.is_validator();
301        info!(
302            "Sync last known own block: {}. Boot count: {}. Timeout: {:?}.",
303            sync_last_known_own_block,
304            boot_counter,
305            context.parameters.sync_last_known_own_block_timeout
306        );
307
308        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
309
310        let leader_schedule = Arc::new(LeaderSchedule::from_store(
311            context.clone(),
312            dag_state.clone(),
313        ));
314
315        let commit_consumer_monitor = commit_consumer.monitor();
316        let commit_observer = CommitObserver::new(
317            context.clone(),
318            commit_consumer,
319            dag_state.clone(),
320            transaction_vote_tracker.clone(),
321        )
322        .await;
323
324        let initial_received_rounds = dag_state
325            .read()
326            .get_last_cached_block_per_authority(Round::MAX)
327            .into_iter()
328            .map(|(block, _)| block.round())
329            .collect::<Vec<_>>();
330        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(
331            context.clone(),
332            initial_received_rounds,
333        )));
334
335        // To avoid accidentally leaking the private key, the protocol key pair should only be
336        // kept in Core.
337        let core = if context.is_validator() {
338            Core::new_validator(
339                context.clone(),
340                leader_schedule,
341                tx_consumer,
342                transaction_vote_tracker.clone(),
343                block_manager,
344                commit_observer,
345                core_signals,
346                protocol_keypair.expect("protocol keypair is required when running as validator"),
347                dag_state.clone(),
348                sync_last_known_own_block,
349                round_tracker.clone(),
350            )
351        } else {
352            Core::new_observer(
353                context.clone(),
354                leader_schedule,
355                block_manager,
356                commit_observer,
357                core_signals,
358                dag_state.clone(),
359            )
360        };
361
362        let (core_dispatcher, core_thread_handle) =
363            ChannelCoreThreadDispatcher::start(context.clone(), &dag_state, core);
364        let core_dispatcher = Arc::new(core_dispatcher);
365        let leader_timeout_handle =
366            LeaderTimeoutTask::start(core_dispatcher.clone(), &signals_receivers, context.clone());
367
368        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
369
370        // Create the PeersPool
371        let peers_pool = Arc::new(PeersPool::new(context.clone()));
372
373        let synchronizer = Synchronizer::start(
374            synchronizer_client.clone(),
375            context.clone(),
376            core_dispatcher.clone(),
377            commit_vote_monitor.clone(),
378            block_verifier.clone(),
379            transaction_vote_tracker.clone(),
380            round_tracker.clone(),
381            dag_state.clone(),
382            peers_pool.clone(),
383            sync_last_known_own_block,
384        );
385
386        let commit_syncer_handle = CommitSyncer::new(
387            context.clone(),
388            core_dispatcher.clone(),
389            commit_vote_monitor.clone(),
390            commit_consumer_monitor.clone(),
391            block_verifier.clone(),
392            transaction_vote_tracker.clone(),
393            round_tracker.clone(),
394            commit_syncer_client.clone(),
395            dag_state.clone(),
396            peers_pool.clone(),
397        )
398        .start();
399
400        // Create BlockSyncService that will be shared by both AuthorityService and ObserverService
401        let block_sync_service = Arc::new(BlockSyncService::new(
402            context.clone(),
403            dag_state.clone(),
404            store.clone(),
405        ));
406
407        let (subscriber, round_prober_handle) = if context.is_validator() {
408            let authority_service = Arc::new(AuthorityService::new(
409                context.clone(),
410                block_verifier.clone(),
411                commit_vote_monitor.clone(),
412                round_tracker.clone(),
413                synchronizer.clone(),
414                core_dispatcher.clone(),
415                signals_receivers.block_broadcast_receiver(),
416                transaction_vote_tracker.clone(),
417                dag_state.clone(),
418                block_sync_service.clone(),
419            ));
420
421            // Start the validator server if this is a validator node.
422            network_manager
423                .start_validator_server(authority_service.clone())
424                .await;
425
426            // Validator node: subscribe to all other validators
427            let s = Subscriber::new(
428                context.clone(),
429                validator_client.clone(),
430                authority_service.clone(),
431                dag_state.clone(),
432            );
433            for (peer, _) in context.committee.authorities() {
434                if peer != context.own_index {
435                    s.subscribe(peer);
436                }
437            }
438
439            // Start the round prober
440            let round_prober_handle = Some(
441                RoundProber::new(
442                    context.clone(),
443                    core_dispatcher.clone(),
444                    round_tracker.clone(),
445                    dag_state.clone(),
446                    validator_client,
447                )
448                .start(),
449            );
450
451            // Start the observer server if the observer server is enabled in the parameters.
452            if context.parameters.observer.is_server_enabled() {
453                let observer_service = Arc::new(ObserverService::new(
454                    context.clone(),
455                    core_dispatcher.clone(),
456                    dag_state.clone(),
457                    signals_receivers.accepted_block_broadcast_receiver(),
458                    block_verifier,
459                    commit_vote_monitor.clone(),
460                    transaction_vote_tracker.clone(),
461                    synchronizer.clone(),
462                    block_sync_service.clone(),
463                    randomness_signature_handler.clone(),
464                ));
465                network_manager
466                    .start_observer_server(observer_service)
467                    .await;
468            }
469
470            (SubscriberType::Validator(s), round_prober_handle)
471        } else {
472            // Observer node: subscribe to specified peer(s) using ObserverSubscriber
473            let observer_client = network_manager.observer_client();
474            let observer_service = Arc::new(ObserverService::new(
475                context.clone(),
476                core_dispatcher.clone(),
477                dag_state.clone(),
478                signals_receivers.accepted_block_broadcast_receiver(),
479                block_verifier,
480                commit_vote_monitor.clone(),
481                transaction_vote_tracker.clone(),
482                synchronizer.clone(),
483                block_sync_service.clone(),
484                randomness_signature_handler.clone(),
485            ));
486
487            let observer_subscriber = ObserverSubscriber::new(
488                context.clone(),
489                observer_client,
490                observer_service.clone(),
491                commit_vote_monitor.clone(),
492                dag_state.clone(),
493                randomness_signature_handler,
494            );
495
496            network_manager
497                .start_observer_server(observer_service)
498                .await;
499
500            // Subscribe to peers specified in the configuration
501            // For now get the first peer from the list to connect to.
502            // TODO: support multiple peers - as in choose/detect which one to connect to.
503            for peer_record in context.parameters.observer.peers.iter().take(1) {
504                let peer_id = if let Some((index, _)) = context
505                    .committee
506                    .authorities()
507                    .find(|(_, authority)| authority.network_key == peer_record.public_key)
508                {
509                    PeerId::Validator(index)
510                } else {
511                    PeerId::Observer(Box::new(peer_record.public_key.clone()))
512                };
513
514                info!("Observer subscribing to peer: {:?}", peer_id);
515                observer_subscriber.subscribe(peer_id);
516            }
517
518            (SubscriberType::Observer(observer_subscriber), None)
519        };
520
521        info!(
522            "Consensus authority started, took {:?}",
523            start_time.elapsed()
524        );
525
526        Self {
527            context,
528            start_time,
529            transaction_client: Arc::new(tx_client),
530            synchronizer,
531            store,
532            commit_syncer_handle,
533            round_prober_handle,
534            leader_timeout_handle,
535            core_thread_handle,
536            subscriber,
537            network_manager,
538        }
539    }
540
541    pub(crate) async fn stop(mut self) {
542        info!(
543            "Stopping authority. Total run time: {:?}",
544            self.start_time.elapsed()
545        );
546
547        // First shutdown components calling into Core.
548        if let Err(e) = self.synchronizer.stop().await {
549            if e.is_panic() {
550                std::panic::resume_unwind(e.into_panic());
551            }
552            warn!(
553                "Failed to stop synchronizer when shutting down consensus: {:?}",
554                e
555            );
556        };
557        self.commit_syncer_handle.stop().await;
558        if let Some(round_prober_handle) = self.round_prober_handle {
559            round_prober_handle.stop().await;
560        }
561        self.leader_timeout_handle.stop().await;
562        // Shutdown Core to stop block productions and broadcast.
563        self.core_thread_handle.stop().await;
564        // Stop block subscriptions before stopping network server.
565        self.subscriber.stop();
566        self.network_manager.stop().await;
567
568        self.context
569            .metrics
570            .node_metrics
571            .uptime
572            .observe(self.start_time.elapsed().as_secs_f64());
573    }
574
575    pub(crate) fn transaction_client(&self) -> Arc<TransactionClient> {
576        self.transaction_client.clone()
577    }
578
579    pub(crate) fn store(&self) -> Arc<RocksDBStore> {
580        self.store.clone()
581    }
582
583    pub(crate) fn update_peer_address(
584        &self,
585        network_pubkey: NetworkPublicKey,
586        address: Option<Multiaddr>,
587    ) {
588        // Find the peer index for this network key
589        let Some(peer) = self
590            .context
591            .committee
592            .authorities()
593            .find(|(_, authority)| authority.network_key == network_pubkey)
594            .map(|(index, _)| index)
595        else {
596            warn!(
597                "Network public key {:?} not found in committee, ignoring address update",
598                network_pubkey
599            );
600            return;
601        };
602
603        // Update the address in the network manager
604        self.network_manager.update_peer_address(peer, address);
605
606        // Re-subscribe to the peer to force reconnection with new address
607        if peer != self.context.own_index {
608            info!("Re-subscribing to peer {} after address update", peer);
609            match &self.subscriber {
610                SubscriberType::Validator(s) => s.subscribe(peer),
611                SubscriberType::Observer(s) => {
612                    // For observer, create a PeerId for the validator
613                    s.subscribe(PeerId::Validator(peer));
614                }
615            }
616        }
617    }
618}
619
620#[cfg(test)]
621mod tests {
622    #![allow(non_snake_case)]
623
624    use std::{
625        collections::{BTreeMap, BTreeSet},
626        sync::Arc,
627        time::Duration,
628    };
629
630    use consensus_config::{
631        AuthorityIndex, ObserverParameters, Parameters, PeerRecord, local_committee_and_keys,
632    };
633    use mysten_metrics::RegistryService;
634    use mysten_metrics::monitored_mpsc::UnboundedReceiver;
635    use prometheus::Registry;
636    use rand::{SeedableRng, rngs::StdRng};
637    use rstest::rstest;
638    use tempfile::TempDir;
639    use tokio::time::{sleep, timeout};
640    use typed_store::DBMetrics;
641
642    use super::*;
643    use crate::{
644        CommittedSubDag,
645        block::{BlockAPI as _, GENESIS_ROUND},
646        transaction::NoopTransactionVerifier,
647    };
648
649    #[rstest]
650    #[tokio::test]
651    async fn test_authority_start_and_stop(
652        #[values(NetworkType::Tonic)] network_type: NetworkType,
653    ) {
654        let (committee, keypairs) = local_committee_and_keys(0, vec![1]);
655        let registry = Registry::new();
656
657        let temp_dir = TempDir::new().unwrap();
658        let parameters = Parameters {
659            db_path: temp_dir.keep(),
660            ..Default::default()
661        };
662        let txn_verifier = NoopTransactionVerifier {};
663
664        let own_index = committee.to_authority_index(0).unwrap();
665        let protocol_keypair = keypairs[own_index].1.clone();
666        let network_keypair = keypairs[own_index].0.clone();
667
668        let (commit_consumer, _) = CommitConsumerArgs::new(0, 0);
669
670        let authority = ConsensusAuthority::start(
671            network_type,
672            0,
673            committee,
674            parameters,
675            ConsensusProtocolConfig::for_testing(),
676            Some(protocol_keypair),
677            network_keypair,
678            Arc::new(Clock::default()),
679            Arc::new(txn_verifier),
680            commit_consumer,
681            registry,
682            0,
683            None,
684        )
685        .await;
686
687        assert_eq!(authority.context().own_index, own_index);
688        assert_eq!(authority.context().committee.epoch(), 0);
689        assert_eq!(authority.context().committee.size(), 1);
690
691        authority.stop().await;
692    }
693
694    #[rstest]
695    #[tokio::test]
696    async fn test_observer_start_and_stop(#[values(NetworkType::Tonic)] network_type: NetworkType) {
697        let (committee, keypairs) = local_committee_and_keys(0, vec![1]);
698        let registry = Registry::new();
699
700        let temp_dir = TempDir::new().unwrap();
701        let parameters = Parameters {
702            db_path: temp_dir.keep(),
703            ..Default::default()
704        };
705        let txn_verifier = NoopTransactionVerifier {};
706
707        // Use any network keypair for the observer, it doesn't need to match a committee member
708        let network_keypair = keypairs[0].0.clone();
709
710        let (commit_consumer, _) = CommitConsumerArgs::new(0, 0);
711
712        let observer = ConsensusAuthority::start(
713            network_type,
714            0,
715            committee.clone(),
716            parameters,
717            ConsensusProtocolConfig::for_testing(),
718            None, // No protocol keypair for observer node
719            network_keypair,
720            Arc::new(Clock::default()),
721            Arc::new(txn_verifier),
722            commit_consumer,
723            registry,
724            0,
725            None,
726        )
727        .await;
728
729        sleep(Duration::from_secs(2)).await;
730
731        // Observer nodes have own_index set to MAX as a special value
732        assert_eq!(observer.context().own_index, AuthorityIndex::MAX);
733        assert_eq!(observer.context().committee.epoch(), 0);
734        assert_eq!(observer.context().committee.size(), 1);
735        assert!(!observer.context().is_validator());
736
737        observer.stop().await;
738    }
739
740    // TODO: build AuthorityFixture.
741    // Spins up a committee of authorities and an observer node that connects to authority 0.
742    // Verifies that the network is progressing, advancing rounds and commits. It also verifies
743    // that the Observer node is receiving blocks from the network.
744    #[rstest]
745    #[tokio::test(flavor = "current_thread")]
746    async fn test_authority_committee(
747        #[values(NetworkType::Tonic)] network_type: NetworkType,
748        #[values(5, 10)] gc_depth: u32,
749    ) {
750        telemetry_subscribers::init_for_testing();
751        let db_registry = Registry::new();
752        DBMetrics::init(RegistryService::new(db_registry));
753
754        const NUM_OF_AUTHORITIES: usize = 4;
755        let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
756        let mut protocol_config = ConsensusProtocolConfig::for_testing();
757        protocol_config.set_gc_depth_for_testing(gc_depth);
758
759        let temp_dirs = (0..NUM_OF_AUTHORITIES)
760            .map(|_| TempDir::new().unwrap())
761            .collect::<Vec<_>>();
762
763        let mut commit_receivers = Vec::with_capacity(committee.size());
764        let mut authorities = Vec::with_capacity(committee.size());
765        let mut boot_counters = [0; NUM_OF_AUTHORITIES];
766
767        // Use a unique port based on gc_depth to avoid conflicts between parallel tests
768        let observer_server_port = 8900 + gc_depth as u16;
769
770        // Create authorities with observer server enabled for authority 0
771        let mut authority_0_network_key = None;
772        for (index, authority_info) in committee.authorities() {
773            let (authority, commit_receiver) = if index.value() == 0 {
774                // Save authority 0's network key for Observer connection
775                authority_0_network_key = Some(authority_info.network_key.clone());
776                // Enable observer server for authority 0
777                make_authority_with_observer_server(
778                    index,
779                    &temp_dirs[index.value()],
780                    committee.clone(),
781                    keypairs.clone(),
782                    network_type,
783                    boot_counters[index],
784                    protocol_config.clone(),
785                    Some(observer_server_port),
786                )
787                .await
788            } else {
789                make_authority(
790                    index,
791                    &temp_dirs[index.value()],
792                    committee.clone(),
793                    keypairs.clone(),
794                    network_type,
795                    boot_counters[index],
796                    protocol_config.clone(),
797                )
798                .await
799            };
800            boot_counters[index] += 1;
801            commit_receivers.push(commit_receiver);
802            authorities.push(authority);
803        }
804
805        // Create an Observer node that connects to authority 0
806        let observer_temp_dir = TempDir::new().unwrap();
807        let mut rng = StdRng::from_seed([99; 32]);
808        let observer_network_keypair = consensus_config::NetworkKeyPair::generate(&mut rng);
809
810        let observer_parameters = Parameters {
811            db_path: observer_temp_dir.path().to_path_buf(),
812            observer: ObserverParameters {
813                // Configure Observer to connect to authority 0
814                peers: vec![PeerRecord {
815                    public_key: authority_0_network_key
816                        .clone()
817                        .expect("Authority 0 network key should be set"),
818                    address: format!("/ip4/127.0.0.1/udp/{}", observer_server_port)
819                        .parse()
820                        .unwrap(),
821                }],
822                ..Default::default()
823            },
824            ..Default::default()
825        };
826
827        let (observer_commit_consumer, observer_commit_receiver) = CommitConsumerArgs::new(0, 0);
828        let observer = ConsensusAuthority::start(
829            network_type,
830            0,
831            committee.clone(),
832            observer_parameters,
833            protocol_config.clone(),
834            None, // No protocol keypair for observer
835            observer_network_keypair,
836            Arc::new(Clock::default()),
837            Arc::new(NoopTransactionVerifier {}),
838            observer_commit_consumer,
839            Registry::new(),
840            0,
841            None,
842        )
843        .await;
844        // The relevant endpoints are now implemented for the synchronizer and commit_syncer components, so the Observer node should be able to catch up and
845        // fetch blocks beyond the latest ones that are fetched from the stream.
846        commit_receivers.push(observer_commit_receiver);
847
848        // Give Observer more time to connect and sync
849        sleep(Duration::from_secs(5)).await;
850
851        const NUM_TRANSACTIONS: u8 = 15;
852        let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
853        for i in 0..NUM_TRANSACTIONS {
854            let txn = vec![i; 16];
855            submitted_transactions.insert(txn.clone());
856            authorities[i as usize % authorities.len()]
857                .transaction_client()
858                .submit(vec![txn])
859                .await
860                .unwrap();
861        }
862
863        for receiver in &mut commit_receivers {
864            let mut expected_transactions = submitted_transactions.clone();
865            loop {
866                let committed_subdag =
867                    tokio::time::timeout(Duration::from_secs(1), receiver.recv())
868                        .await
869                        .unwrap()
870                        .unwrap();
871                for b in committed_subdag.blocks {
872                    for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
873                        assert!(
874                            expected_transactions.remove(&txn),
875                            "Transaction not submitted or already seen: {:?}",
876                            txn
877                        );
878                    }
879                }
880                if expected_transactions.is_empty() {
881                    break;
882                }
883            }
884        }
885
886        // Stop authority 1.
887        let index = committee.to_authority_index(1).unwrap();
888        authorities.remove(index.value()).stop().await;
889        sleep(Duration::from_secs(10)).await;
890
891        // Restart authority 1 and let it run.
892        let (authority, commit_receiver) = make_authority(
893            index,
894            &temp_dirs[index.value()],
895            committee.clone(),
896            keypairs.clone(),
897            network_type,
898            boot_counters[index],
899            protocol_config.clone(),
900        )
901        .await;
902        boot_counters[index] += 1;
903        commit_receivers[index] = commit_receiver;
904        authorities.insert(index.value(), authority);
905        sleep(Duration::from_secs(10)).await;
906
907        // Verify that the Observer node is running
908        // TODO: The actual block processing for observers is not fully implemented yet
909        // for now we just verify that blocks are received and the number of received blocks is not far from
910        // the number of blocks sent by authority 0.
911        let observer_context = observer.context();
912        assert!(
913            observer_context.is_observer(),
914            "It should be an observer node"
915        );
916
917        // Get the total verified_blocks from authority 0 (sum across all sending authorities)
918        let authority_0 = &authorities[0];
919        let authority_0_context = authority_0.context();
920        let mut authority_0_total_verified_blocks = 0;
921
922        // Sum verified_blocks from all authorities as seen by authority 0
923        for (_, authority_info) in committee.authorities() {
924            if let Ok(metric) = authority_0_context
925                .metrics
926                .node_metrics
927                .verified_blocks
928                .get_metric_with_label_values(&[&authority_info.hostname])
929            {
930                authority_0_total_verified_blocks += metric.get();
931                println!(
932                    "authority_info.hostname: {}, metric: {:?}",
933                    authority_info.hostname, authority_0_total_verified_blocks
934                );
935            }
936        }
937
938        let mut authority_0_total_proposed_blocks = 0;
939        for force in [true, false] {
940            if let Ok(metric) = authority_0_context
941                .metrics
942                .node_metrics
943                .proposed_blocks
944                .get_metric_with_label_values(&[&force.to_string()])
945            {
946                authority_0_total_proposed_blocks += metric.get();
947            }
948        }
949
950        authority_0_total_verified_blocks += authority_0_total_proposed_blocks;
951
952        // Sum verified_blocks from all authorities as seen by the observer
953        let mut observer_received_blocks = 0;
954        for (_, authority_info) in committee.authorities() {
955            if let Ok(metric) = observer_context
956                .metrics
957                .node_metrics
958                .verified_blocks
959                .get_metric_with_label_values(&[&authority_info.hostname])
960            {
961                observer_received_blocks += metric.get();
962            }
963        }
964
965        // Compare the values - they should be related but might not be exactly equal
966        // due to timing and the observer connecting mid-stream
967        assert!(
968            observer_received_blocks > 0,
969            "Observer should have received at least some blocks, got: {}",
970            observer_received_blocks
971        );
972
973        println!(
974            "authority_0_total_verified_blocks: {}, observer_received_blocks: {}",
975            authority_0_total_verified_blocks, observer_received_blocks
976        );
977
978        const TOLERANCE: u64 = 20;
979        assert!(
980            authority_0_total_verified_blocks - observer_received_blocks <= TOLERANCE,
981            "The number of blocks received by the observer ({}) should be close to the number of blocks verified by authority 0 ({})",
982            observer_received_blocks,
983            authority_0_total_verified_blocks,
984        );
985
986        // Stop observer first
987        observer.stop().await;
988
989        // Stop all authorities and exit.
990        for authority in authorities {
991            authority.stop().await;
992        }
993    }
994
995    #[rstest]
996    #[tokio::test(flavor = "current_thread")]
997    async fn test_small_committee(
998        #[values(NetworkType::Tonic)] network_type: NetworkType,
999        #[values(1, 2, 3)] num_authorities: usize,
1000    ) {
1001        telemetry_subscribers::init_for_testing();
1002        let db_registry = Registry::new();
1003        DBMetrics::init(RegistryService::new(db_registry));
1004
1005        let (committee, keypairs) = local_committee_and_keys(0, vec![1; num_authorities]);
1006        let protocol_config = ConsensusProtocolConfig::for_testing();
1007
1008        let temp_dirs = (0..num_authorities)
1009            .map(|_| TempDir::new().unwrap())
1010            .collect::<Vec<_>>();
1011
1012        let mut output_receivers = Vec::with_capacity(committee.size());
1013        let mut authorities: Vec<ConsensusAuthority> = Vec::with_capacity(committee.size());
1014        let mut boot_counters = vec![0; num_authorities];
1015
1016        for (index, _authority_info) in committee.authorities() {
1017            let (authority, commit_receiver) = make_authority(
1018                index,
1019                &temp_dirs[index.value()],
1020                committee.clone(),
1021                keypairs.clone(),
1022                network_type,
1023                boot_counters[index],
1024                protocol_config.clone(),
1025            )
1026            .await;
1027            boot_counters[index] += 1;
1028            output_receivers.push(commit_receiver);
1029            authorities.push(authority);
1030        }
1031
1032        const NUM_TRANSACTIONS: u8 = 15;
1033        let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
1034        for i in 0..NUM_TRANSACTIONS {
1035            let txn = vec![i; 16];
1036            submitted_transactions.insert(txn.clone());
1037            authorities[i as usize % authorities.len()]
1038                .transaction_client()
1039                .submit(vec![txn])
1040                .await
1041                .unwrap();
1042        }
1043
1044        for receiver in &mut output_receivers {
1045            let mut expected_transactions = submitted_transactions.clone();
1046            loop {
1047                let committed_subdag =
1048                    tokio::time::timeout(Duration::from_secs(1), receiver.recv())
1049                        .await
1050                        .unwrap()
1051                        .unwrap();
1052                for b in committed_subdag.blocks {
1053                    for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
1054                        assert!(
1055                            expected_transactions.remove(&txn),
1056                            "Transaction not submitted or already seen: {:?}",
1057                            txn
1058                        );
1059                    }
1060                }
1061                if expected_transactions.is_empty() {
1062                    break;
1063                }
1064            }
1065        }
1066
1067        // Stop authority 0.
1068        let index = committee.to_authority_index(0).unwrap();
1069        authorities.remove(index.value()).stop().await;
1070        sleep(Duration::from_secs(10)).await;
1071
1072        // Restart authority 0 and let it run.
1073        let (authority, commit_receiver) = make_authority(
1074            index,
1075            &temp_dirs[index.value()],
1076            committee.clone(),
1077            keypairs.clone(),
1078            network_type,
1079            boot_counters[index],
1080            protocol_config.clone(),
1081        )
1082        .await;
1083        boot_counters[index] += 1;
1084        output_receivers[index] = commit_receiver;
1085        authorities.insert(index.value(), authority);
1086        sleep(Duration::from_secs(10)).await;
1087
1088        // Stop all authorities and exit.
1089        for authority in authorities {
1090            authority.stop().await;
1091        }
1092    }
1093
1094    #[rstest]
1095    #[tokio::test(flavor = "current_thread")]
1096    async fn test_amnesia_recovery_success(#[values(5, 10)] gc_depth: u32) {
1097        telemetry_subscribers::init_for_testing();
1098        let db_registry = Registry::new();
1099        DBMetrics::init(RegistryService::new(db_registry));
1100
1101        const NUM_OF_AUTHORITIES: usize = 4;
1102        let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
1103        let mut commit_receivers = vec![];
1104        let mut authorities = BTreeMap::new();
1105        let mut temp_dirs = BTreeMap::new();
1106        let mut boot_counters = [0; NUM_OF_AUTHORITIES];
1107
1108        let mut protocol_config = ConsensusProtocolConfig::for_testing();
1109        protocol_config.set_gc_depth_for_testing(gc_depth);
1110
1111        for (index, _authority_info) in committee.authorities() {
1112            let dir = TempDir::new().unwrap();
1113            let (authority, commit_receiver) = make_authority(
1114                index,
1115                &dir,
1116                committee.clone(),
1117                keypairs.clone(),
1118                NetworkType::Tonic,
1119                boot_counters[index],
1120                protocol_config.clone(),
1121            )
1122            .await;
1123            boot_counters[index] += 1;
1124            commit_receivers.push(commit_receiver);
1125            authorities.insert(index, authority);
1126            temp_dirs.insert(index, dir);
1127        }
1128
1129        // Now we take the receiver of authority 1 and we wait until we see at least one block committed from this authority
1130        // We wait until we see at least one committed block authored from this authority. That way we'll be 100% sure that
1131        // at least one block has been proposed and successfully received by a quorum of nodes.
1132        let index_1 = committee.to_authority_index(1).unwrap();
1133        'outer: while let Some(result) =
1134            timeout(Duration::from_secs(10), commit_receivers[index_1].recv())
1135                .await
1136                .expect("Timed out while waiting for at least one committed block from authority 1")
1137        {
1138            for block in result.blocks {
1139                if block.round() > GENESIS_ROUND && block.author() == index_1 {
1140                    break 'outer;
1141                }
1142            }
1143        }
1144
1145        // Stop authority 1 & 2.
1146        // * Authority 1 will be used to wipe out their DB and practically "force" the amnesia recovery.
1147        // * Authority 2 is stopped in order to simulate less than f+1 availability which will
1148        // make authority 1 retry during amnesia recovery until it has finally managed to successfully get back f+1 responses.
1149        // once authority 2 is up and running again.
1150        authorities.remove(&index_1).unwrap().stop().await;
1151        let index_2 = committee.to_authority_index(2).unwrap();
1152        authorities.remove(&index_2).unwrap().stop().await;
1153        sleep(Duration::from_secs(5)).await;
1154
1155        // Authority 1: create a new directory to simulate amnesia. The node will start having participated previously
1156        // to consensus but now will attempt to synchronize the last own block and recover from there. It won't be able
1157        // to do that successfully as authority 2 is still down.
1158        let dir = TempDir::new().unwrap();
1159        // We do reset the boot counter for this one to simulate a "binary" restart
1160        boot_counters[index_1] = 0;
1161        let (authority, mut commit_receiver) = make_authority(
1162            index_1,
1163            &dir,
1164            committee.clone(),
1165            keypairs.clone(),
1166            NetworkType::Tonic,
1167            boot_counters[index_1],
1168            protocol_config.clone(),
1169        )
1170        .await;
1171        boot_counters[index_1] += 1;
1172        authorities.insert(index_1, authority);
1173        temp_dirs.insert(index_1, dir);
1174        sleep(Duration::from_secs(5)).await;
1175
1176        // Now spin up authority 2 using its earlier directly - so no amnesia recovery should be forced here.
1177        // Authority 1 should be able to recover from amnesia successfully.
1178        let (authority, _commit_receiver) = make_authority(
1179            index_2,
1180            &temp_dirs[&index_2],
1181            committee.clone(),
1182            keypairs,
1183            NetworkType::Tonic,
1184            boot_counters[index_2],
1185            protocol_config.clone(),
1186        )
1187        .await;
1188        boot_counters[index_2] += 1;
1189        authorities.insert(index_2, authority);
1190        sleep(Duration::from_secs(5)).await;
1191
1192        // We wait until we see at least one committed block authored from this authority
1193        'outer: while let Some(result) = commit_receiver.recv().await {
1194            for block in result.blocks {
1195                if block.round() > GENESIS_ROUND && block.author() == index_1 {
1196                    break 'outer;
1197                }
1198            }
1199        }
1200
1201        // Stop all authorities and exit.
1202        for (_, authority) in authorities {
1203            authority.stop().await;
1204        }
1205    }
1206
1207    // TODO: create a fixture
1208    async fn make_authority(
1209        index: AuthorityIndex,
1210        db_dir: &TempDir,
1211        committee: Committee,
1212        keypairs: Vec<(NetworkKeyPair, ProtocolKeyPair)>,
1213        network_type: NetworkType,
1214        boot_counter: u64,
1215        protocol_config: ConsensusProtocolConfig,
1216    ) -> (ConsensusAuthority, UnboundedReceiver<CommittedSubDag>) {
1217        make_authority_with_observer_server(
1218            index,
1219            db_dir,
1220            committee,
1221            keypairs,
1222            network_type,
1223            boot_counter,
1224            protocol_config,
1225            None, // No observer server port
1226        )
1227        .await
1228    }
1229
1230    async fn make_authority_with_observer_server(
1231        index: AuthorityIndex,
1232        db_dir: &TempDir,
1233        committee: Committee,
1234        keypairs: Vec<(NetworkKeyPair, ProtocolKeyPair)>,
1235        network_type: NetworkType,
1236        boot_counter: u64,
1237        protocol_config: ConsensusProtocolConfig,
1238        observer_server_port: Option<u16>,
1239    ) -> (ConsensusAuthority, UnboundedReceiver<CommittedSubDag>) {
1240        let registry = Registry::new();
1241
1242        // Cache less blocks to exercise commit sync.
1243        let mut parameters = Parameters {
1244            db_path: db_dir.path().to_path_buf(),
1245            dag_state_cached_rounds: 5,
1246            commit_sync_parallel_fetches: 2,
1247            commit_sync_batch_size: 3,
1248            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
1249            ..Default::default()
1250        };
1251
1252        // Enable observer server if port is provided
1253        if let Some(port) = observer_server_port {
1254            parameters.observer.server_port = Some(port);
1255        }
1256
1257        let txn_verifier = NoopTransactionVerifier {};
1258
1259        let protocol_keypair = keypairs[index].1.clone();
1260        let network_keypair = keypairs[index].0.clone();
1261
1262        let (commit_consumer, commit_receiver) = CommitConsumerArgs::new(0, 0);
1263
1264        let authority = ConsensusAuthority::start(
1265            network_type,
1266            0,
1267            committee,
1268            parameters,
1269            protocol_config,
1270            Some(protocol_keypair),
1271            network_keypair,
1272            Arc::new(Clock::default()),
1273            Arc::new(txn_verifier),
1274            commit_consumer,
1275            registry,
1276            boot_counter,
1277            None,
1278        )
1279        .await;
1280
1281        (authority, commit_receiver)
1282    }
1283}