consensus_core/
authority_node.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{sync::Arc, time::Instant};
5
6use consensus_config::{
7    Committee, ConsensusProtocolConfig, NetworkKeyPair, NetworkPublicKey, Parameters,
8    ProtocolKeyPair,
9};
10use consensus_types::block::Round;
11use itertools::Itertools;
12use mysten_network::Multiaddr;
13use parking_lot::RwLock;
14use prometheus::Registry;
15use tracing::{info, warn};
16
17use crate::{
18    BlockAPI as _, CommitConsumerArgs,
19    authority_service::AuthorityService,
20    block_manager::BlockManager,
21    block_sync_service::BlockSyncService,
22    block_verifier::SignedBlockVerifier,
23    commit_observer::CommitObserver,
24    commit_syncer::{CommitSyncer, CommitSyncerHandle},
25    commit_vote_monitor::CommitVoteMonitor,
26    context::{Clock, Context},
27    core::{Core, CoreSignals},
28    core_thread::{ChannelCoreThreadDispatcher, CoreThreadHandle},
29    dag_state::DagState,
30    leader_schedule::LeaderSchedule,
31    leader_timeout::{LeaderTimeoutTask, LeaderTimeoutTaskHandle},
32    metrics::initialise_metrics,
33    network::{
34        CommitSyncerClient, NetworkManager, PeerId, SynchronizerClient, tonic_network::TonicManager,
35    },
36    observer_service::ObserverService,
37    observer_subscriber::ObserverSubscriber,
38    peers_pool::PeersPool,
39    round_prober::{RoundProber, RoundProberHandle},
40    round_tracker::RoundTracker,
41    storage::rocksdb_store::RocksDBStore,
42    subscriber::Subscriber,
43    synchronizer::{Synchronizer, SynchronizerHandle},
44    transaction::{TransactionClient, TransactionConsumer, TransactionVerifier},
45    transaction_vote_tracker::TransactionVoteTracker,
46};
47
48/// ConsensusAuthority is used by Sui to manage the lifetime of AuthorityNode.
49/// It hides the details of the implementation from the caller, MysticetiManager.
50#[allow(private_interfaces)]
51pub enum ConsensusAuthority {
52    WithTonic(AuthorityNode<TonicManager>),
53}
54
55impl ConsensusAuthority {
56    pub async fn start(
57        network_type: NetworkType,
58        epoch_start_timestamp_ms: u64,
59        committee: Committee,
60        parameters: Parameters,
61        protocol_config: ConsensusProtocolConfig,
62        // Only required for validator nodes. Observer nodes don't have a protocol keypair.
63        protocol_keypair: Option<ProtocolKeyPair>,
64        network_keypair: NetworkKeyPair,
65        clock: Arc<Clock>,
66        transaction_verifier: Arc<dyn TransactionVerifier>,
67        commit_consumer: CommitConsumerArgs,
68        registry: Registry,
69        // A counter that keeps track of how many times the consensus authority has been booted while the process
70        // has been running. It's useful for making decisions on whether amnesia recovery should run.
71        // When `boot_counter` is 0, `ConsensusAuthority` will initiate the process of amnesia recovery if that's enabled in the parameters.
72        boot_counter: u64,
73    ) -> Self {
74        match network_type {
75            NetworkType::Tonic => {
76                let authority = AuthorityNode::start(
77                    epoch_start_timestamp_ms,
78                    committee,
79                    parameters,
80                    protocol_config,
81                    protocol_keypair,
82                    network_keypair,
83                    clock,
84                    transaction_verifier,
85                    commit_consumer,
86                    registry,
87                    boot_counter,
88                )
89                .await;
90                Self::WithTonic(authority)
91            }
92        }
93    }
94
95    pub async fn stop(self) {
96        match self {
97            Self::WithTonic(authority) => authority.stop().await,
98        }
99    }
100
101    pub fn update_peer_address(
102        &self,
103        network_pubkey: NetworkPublicKey,
104        address: Option<Multiaddr>,
105    ) {
106        match self {
107            Self::WithTonic(authority) => authority.update_peer_address(network_pubkey, address),
108        }
109    }
110
111    pub fn transaction_client(&self) -> Arc<TransactionClient> {
112        match self {
113            Self::WithTonic(authority) => authority.transaction_client(),
114        }
115    }
116
117    #[cfg(test)]
118    fn context(&self) -> &Arc<Context> {
119        match self {
120            Self::WithTonic(authority) => &authority.context,
121        }
122    }
123}
124
125#[derive(Clone, Copy, PartialEq, Eq, Debug)]
126pub enum NetworkType {
127    Tonic,
128}
129
130/// Enum to handle different subscriber types based on whether the node is a validator or observer
131enum SubscriberType<N: NetworkManager> {
132    Validator(Subscriber<N::ValidatorClient, AuthorityService<ChannelCoreThreadDispatcher>>),
133    Observer(ObserverSubscriber<N::ObserverClient, ObserverService>),
134}
135
136impl<N: NetworkManager> SubscriberType<N> {
137    fn stop(&self) {
138        match self {
139            SubscriberType::Validator(subscriber) => subscriber.stop(),
140            SubscriberType::Observer(subscriber) => subscriber.stop(),
141        }
142    }
143}
144
145pub(crate) struct AuthorityNode<N>
146where
147    N: NetworkManager,
148{
149    context: Arc<Context>,
150    start_time: Instant,
151    transaction_client: Arc<TransactionClient>,
152    synchronizer: Arc<SynchronizerHandle>,
153
154    commit_syncer_handle: CommitSyncerHandle,
155    round_prober_handle: Option<RoundProberHandle>,
156    leader_timeout_handle: LeaderTimeoutTaskHandle,
157    core_thread_handle: CoreThreadHandle,
158    subscriber: SubscriberType<N>,
159    network_manager: N,
160}
161
162impl<N> AuthorityNode<N>
163where
164    N: NetworkManager,
165{
166    // See comments above ConsensusAuthority::start() for details on the input.
167    pub(crate) async fn start(
168        epoch_start_timestamp_ms: u64,
169        committee: Committee,
170        parameters: Parameters,
171        protocol_config: ConsensusProtocolConfig,
172        protocol_keypair: Option<ProtocolKeyPair>,
173        network_keypair: NetworkKeyPair,
174        clock: Arc<Clock>,
175        transaction_verifier: Arc<dyn TransactionVerifier>,
176        commit_consumer: CommitConsumerArgs,
177        registry: Registry,
178        boot_counter: u64,
179    ) -> Self {
180        let metrics = initialise_metrics(registry);
181
182        // If a protocol key pair is provided, then this is a validator node.
183        let own_index = if let Some(protocol_keypair) = &protocol_keypair {
184            let (own_index, _) = committee
185                .authorities()
186                .find(|(_, a)| a.protocol_key == protocol_keypair.public())
187                .expect("Own authority should be among the consensus authorities!");
188
189            let own_hostname = committee.authority(own_index).hostname.clone();
190            info!(
191                "Starting consensus validator authority {} {}, {:?}, epoch start timestamp {}, boot counter {}, replaying after commit index {}, consumer last processed commit index {}",
192                own_index,
193                own_hostname,
194                protocol_config.protocol_version(),
195                epoch_start_timestamp_ms,
196                boot_counter,
197                commit_consumer.replay_after_commit_index,
198                commit_consumer.consumer_last_processed_commit_index
199            );
200
201            metrics
202                .node_metrics
203                .authority_index
204                .with_label_values(&[&own_hostname])
205                .set(own_index.value() as i64);
206            Some(own_index)
207        } else {
208            // Otherwise this is an observer node and no index exists for it.
209            info!(
210                "Starting consensus observer authority, {:?}, epoch start timestamp {}, boot counter {}, replaying after commit index {}, consumer last processed commit index {}",
211                protocol_config.protocol_version(),
212                epoch_start_timestamp_ms,
213                boot_counter,
214                commit_consumer.replay_after_commit_index,
215                commit_consumer.consumer_last_processed_commit_index
216            );
217            None
218        };
219
220        info!(
221            "Consensus authorities: {}",
222            committee
223                .authorities()
224                .map(|(i, a)| format!("{}: {}", i, a.hostname))
225                .join(", ")
226        );
227        info!("Consensus parameters: {:?}", parameters);
228        info!("Consensus committee: {:?}", committee);
229        let context = Arc::new(Context::new(
230            epoch_start_timestamp_ms,
231            own_index,
232            committee,
233            parameters,
234            protocol_config,
235            metrics,
236            clock,
237        ));
238        let start_time = Instant::now();
239
240        context
241            .metrics
242            .node_metrics
243            .protocol_version
244            .set(context.protocol_config.protocol_version() as i64);
245
246        let (tx_client, tx_receiver) = TransactionClient::new(context.clone());
247        let tx_consumer = TransactionConsumer::new(tx_receiver, context.clone());
248
249        let (core_signals, signals_receivers) = CoreSignals::new(context.clone());
250
251        let mut network_manager = N::new(context.clone(), network_keypair);
252        let validator_client = network_manager.validator_client();
253        let observer_client = network_manager.observer_client();
254
255        let synchronizer_client = Arc::new(SynchronizerClient::<
256            N::ValidatorClient,
257            N::ObserverClient,
258        >::new(
259            context.clone(),
260            Some(validator_client.clone()),
261            Some(observer_client.clone()),
262        ));
263        let commit_syncer_client = Arc::new(CommitSyncerClient::<
264            N::ValidatorClient,
265            N::ObserverClient,
266        >::new(
267            context.clone(),
268            Some(validator_client.clone()),
269            Some(observer_client.clone()),
270        ));
271
272        let store_path = context.parameters.db_path.as_path().to_str().unwrap();
273        let store = Arc::new(RocksDBStore::new(
274            store_path,
275            context.parameters.use_fifo_compaction,
276        ));
277        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
278
279        let block_verifier = Arc::new(SignedBlockVerifier::new(
280            context.clone(),
281            transaction_verifier,
282        ));
283
284        let transaction_vote_tracker =
285            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
286
287        // Only sync last known own block if we are a validator and it's the first boot.
288        let sync_last_known_own_block = boot_counter == 0
289            && !context
290                .parameters
291                .sync_last_known_own_block_timeout
292                .is_zero()
293            && context.is_validator();
294        info!(
295            "Sync last known own block: {}. Boot count: {}. Timeout: {:?}.",
296            sync_last_known_own_block,
297            boot_counter,
298            context.parameters.sync_last_known_own_block_timeout
299        );
300
301        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
302
303        let leader_schedule = Arc::new(LeaderSchedule::from_store(
304            context.clone(),
305            dag_state.clone(),
306        ));
307
308        let commit_consumer_monitor = commit_consumer.monitor();
309        let commit_observer = CommitObserver::new(
310            context.clone(),
311            commit_consumer,
312            dag_state.clone(),
313            transaction_vote_tracker.clone(),
314        )
315        .await;
316
317        let initial_received_rounds = dag_state
318            .read()
319            .get_last_cached_block_per_authority(Round::MAX)
320            .into_iter()
321            .map(|(block, _)| block.round())
322            .collect::<Vec<_>>();
323        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(
324            context.clone(),
325            initial_received_rounds,
326        )));
327
328        // To avoid accidentally leaking the private key, the protocol key pair should only be
329        // kept in Core.
330        let core = if context.is_validator() {
331            Core::new_validator(
332                context.clone(),
333                leader_schedule,
334                tx_consumer,
335                transaction_vote_tracker.clone(),
336                block_manager,
337                commit_observer,
338                core_signals,
339                protocol_keypair.expect("protocol keypair is required when running as validator"),
340                dag_state.clone(),
341                sync_last_known_own_block,
342                round_tracker.clone(),
343            )
344        } else {
345            Core::new_observer(
346                context.clone(),
347                leader_schedule,
348                block_manager,
349                commit_observer,
350                core_signals,
351                dag_state.clone(),
352            )
353        };
354
355        let (core_dispatcher, core_thread_handle) =
356            ChannelCoreThreadDispatcher::start(context.clone(), &dag_state, core);
357        let core_dispatcher = Arc::new(core_dispatcher);
358        let leader_timeout_handle =
359            LeaderTimeoutTask::start(core_dispatcher.clone(), &signals_receivers, context.clone());
360
361        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
362
363        // Create the PeersPool
364        let peers_pool = Arc::new(PeersPool::new(context.clone()));
365
366        let synchronizer = Synchronizer::start(
367            synchronizer_client.clone(),
368            context.clone(),
369            core_dispatcher.clone(),
370            commit_vote_monitor.clone(),
371            block_verifier.clone(),
372            transaction_vote_tracker.clone(),
373            round_tracker.clone(),
374            dag_state.clone(),
375            peers_pool.clone(),
376            sync_last_known_own_block,
377        );
378
379        let commit_syncer_handle = CommitSyncer::new(
380            context.clone(),
381            core_dispatcher.clone(),
382            commit_vote_monitor.clone(),
383            commit_consumer_monitor.clone(),
384            block_verifier.clone(),
385            transaction_vote_tracker.clone(),
386            round_tracker.clone(),
387            commit_syncer_client.clone(),
388            dag_state.clone(),
389            peers_pool.clone(),
390        )
391        .start();
392
393        // Create BlockSyncService that will be shared by both AuthorityService and ObserverService
394        let block_sync_service = Arc::new(BlockSyncService::new(
395            context.clone(),
396            dag_state.clone(),
397            store.clone(),
398        ));
399
400        let (subscriber, round_prober_handle) = if context.is_validator() {
401            let authority_service = Arc::new(AuthorityService::new(
402                context.clone(),
403                block_verifier.clone(),
404                commit_vote_monitor.clone(),
405                round_tracker.clone(),
406                synchronizer.clone(),
407                core_dispatcher.clone(),
408                signals_receivers.block_broadcast_receiver(),
409                transaction_vote_tracker.clone(),
410                dag_state.clone(),
411                block_sync_service.clone(),
412            ));
413
414            // Start the validator server if this is a validator node.
415            network_manager
416                .start_validator_server(authority_service.clone())
417                .await;
418
419            // Validator node: subscribe to all other validators
420            let s = Subscriber::new(
421                context.clone(),
422                validator_client.clone(),
423                authority_service.clone(),
424                dag_state.clone(),
425            );
426            for (peer, _) in context.committee.authorities() {
427                if peer != context.own_index {
428                    s.subscribe(peer);
429                }
430            }
431
432            // Start the round prober
433            let round_prober_handle = Some(
434                RoundProber::new(
435                    context.clone(),
436                    core_dispatcher.clone(),
437                    round_tracker.clone(),
438                    dag_state.clone(),
439                    validator_client,
440                )
441                .start(),
442            );
443
444            // Start the observer server if the observer server is enabled in the parameters.
445            if context.parameters.observer.is_server_enabled() {
446                let observer_service = Arc::new(ObserverService::new(
447                    context.clone(),
448                    core_dispatcher.clone(),
449                    dag_state.clone(),
450                    signals_receivers.accepted_block_broadcast_receiver(),
451                    block_verifier,
452                    commit_vote_monitor.clone(),
453                    transaction_vote_tracker.clone(),
454                    synchronizer.clone(),
455                    block_sync_service.clone(),
456                ));
457                network_manager
458                    .start_observer_server(observer_service)
459                    .await;
460            }
461
462            (SubscriberType::Validator(s), round_prober_handle)
463        } else {
464            // Observer node: subscribe to specified peer(s) using ObserverSubscriber
465            let observer_client = network_manager.observer_client();
466            let observer_service = Arc::new(ObserverService::new(
467                context.clone(),
468                core_dispatcher.clone(),
469                dag_state.clone(),
470                signals_receivers.accepted_block_broadcast_receiver(),
471                block_verifier,
472                commit_vote_monitor.clone(),
473                transaction_vote_tracker.clone(),
474                synchronizer.clone(),
475                block_sync_service.clone(),
476            ));
477
478            let observer_subscriber = ObserverSubscriber::new(
479                context.clone(),
480                observer_client,
481                observer_service.clone(),
482                dag_state.clone(),
483            );
484
485            network_manager
486                .start_observer_server(observer_service)
487                .await;
488
489            // Subscribe to peers specified in the configuration
490            // For now get the first peer from the list to connect to.
491            // TODO: support multiple peers - as in choose/detect which one to connect to.
492            for peer_record in context.parameters.observer.peers.iter().take(1) {
493                let peer_id = if let Some((index, _)) = context
494                    .committee
495                    .authorities()
496                    .find(|(_, authority)| authority.network_key == peer_record.public_key)
497                {
498                    PeerId::Validator(index)
499                } else {
500                    PeerId::Observer(Box::new(peer_record.public_key.clone()))
501                };
502
503                info!("Observer subscribing to peer: {:?}", peer_id);
504                observer_subscriber.subscribe(peer_id);
505            }
506
507            (SubscriberType::Observer(observer_subscriber), None)
508        };
509
510        info!(
511            "Consensus authority started, took {:?}",
512            start_time.elapsed()
513        );
514
515        Self {
516            context,
517            start_time,
518            transaction_client: Arc::new(tx_client),
519            synchronizer,
520            commit_syncer_handle,
521            round_prober_handle,
522            leader_timeout_handle,
523            core_thread_handle,
524            subscriber,
525            network_manager,
526        }
527    }
528
529    pub(crate) async fn stop(mut self) {
530        info!(
531            "Stopping authority. Total run time: {:?}",
532            self.start_time.elapsed()
533        );
534
535        // First shutdown components calling into Core.
536        if let Err(e) = self.synchronizer.stop().await {
537            if e.is_panic() {
538                std::panic::resume_unwind(e.into_panic());
539            }
540            warn!(
541                "Failed to stop synchronizer when shutting down consensus: {:?}",
542                e
543            );
544        };
545        self.commit_syncer_handle.stop().await;
546        if let Some(round_prober_handle) = self.round_prober_handle {
547            round_prober_handle.stop().await;
548        }
549        self.leader_timeout_handle.stop().await;
550        // Shutdown Core to stop block productions and broadcast.
551        self.core_thread_handle.stop().await;
552        // Stop block subscriptions before stopping network server.
553        self.subscriber.stop();
554        self.network_manager.stop().await;
555
556        self.context
557            .metrics
558            .node_metrics
559            .uptime
560            .observe(self.start_time.elapsed().as_secs_f64());
561    }
562
563    pub(crate) fn transaction_client(&self) -> Arc<TransactionClient> {
564        self.transaction_client.clone()
565    }
566
567    pub(crate) fn update_peer_address(
568        &self,
569        network_pubkey: NetworkPublicKey,
570        address: Option<Multiaddr>,
571    ) {
572        // Find the peer index for this network key
573        let Some(peer) = self
574            .context
575            .committee
576            .authorities()
577            .find(|(_, authority)| authority.network_key == network_pubkey)
578            .map(|(index, _)| index)
579        else {
580            warn!(
581                "Network public key {:?} not found in committee, ignoring address update",
582                network_pubkey
583            );
584            return;
585        };
586
587        // Update the address in the network manager
588        self.network_manager.update_peer_address(peer, address);
589
590        // Re-subscribe to the peer to force reconnection with new address
591        if peer != self.context.own_index {
592            info!("Re-subscribing to peer {} after address update", peer);
593            match &self.subscriber {
594                SubscriberType::Validator(s) => s.subscribe(peer),
595                SubscriberType::Observer(s) => {
596                    // For observer, create a PeerId for the validator
597                    s.subscribe(PeerId::Validator(peer));
598                }
599            }
600        }
601    }
602}
603
604#[cfg(test)]
605mod tests {
606    #![allow(non_snake_case)]
607
608    use std::{
609        collections::{BTreeMap, BTreeSet},
610        sync::Arc,
611        time::Duration,
612    };
613
614    use consensus_config::{
615        AuthorityIndex, ObserverParameters, Parameters, PeerRecord, local_committee_and_keys,
616    };
617    use mysten_metrics::RegistryService;
618    use mysten_metrics::monitored_mpsc::UnboundedReceiver;
619    use prometheus::Registry;
620    use rand::{SeedableRng, rngs::StdRng};
621    use rstest::rstest;
622    use tempfile::TempDir;
623    use tokio::time::{sleep, timeout};
624    use typed_store::DBMetrics;
625
626    use super::*;
627    use crate::{
628        CommittedSubDag,
629        block::{BlockAPI as _, GENESIS_ROUND},
630        transaction::NoopTransactionVerifier,
631    };
632
633    #[rstest]
634    #[tokio::test]
635    async fn test_authority_start_and_stop(
636        #[values(NetworkType::Tonic)] network_type: NetworkType,
637    ) {
638        let (committee, keypairs) = local_committee_and_keys(0, vec![1]);
639        let registry = Registry::new();
640
641        let temp_dir = TempDir::new().unwrap();
642        let parameters = Parameters {
643            db_path: temp_dir.keep(),
644            ..Default::default()
645        };
646        let txn_verifier = NoopTransactionVerifier {};
647
648        let own_index = committee.to_authority_index(0).unwrap();
649        let protocol_keypair = keypairs[own_index].1.clone();
650        let network_keypair = keypairs[own_index].0.clone();
651
652        let (commit_consumer, _) = CommitConsumerArgs::new(0, 0);
653
654        let authority = ConsensusAuthority::start(
655            network_type,
656            0,
657            committee,
658            parameters,
659            ConsensusProtocolConfig::for_testing(),
660            Some(protocol_keypair),
661            network_keypair,
662            Arc::new(Clock::default()),
663            Arc::new(txn_verifier),
664            commit_consumer,
665            registry,
666            0,
667        )
668        .await;
669
670        assert_eq!(authority.context().own_index, own_index);
671        assert_eq!(authority.context().committee.epoch(), 0);
672        assert_eq!(authority.context().committee.size(), 1);
673
674        authority.stop().await;
675    }
676
677    #[rstest]
678    #[tokio::test]
679    async fn test_observer_start_and_stop(#[values(NetworkType::Tonic)] network_type: NetworkType) {
680        let (committee, keypairs) = local_committee_and_keys(0, vec![1]);
681        let registry = Registry::new();
682
683        let temp_dir = TempDir::new().unwrap();
684        let parameters = Parameters {
685            db_path: temp_dir.keep(),
686            ..Default::default()
687        };
688        let txn_verifier = NoopTransactionVerifier {};
689
690        // Use any network keypair for the observer, it doesn't need to match a committee member
691        let network_keypair = keypairs[0].0.clone();
692
693        let (commit_consumer, _) = CommitConsumerArgs::new(0, 0);
694
695        let observer = ConsensusAuthority::start(
696            network_type,
697            0,
698            committee.clone(),
699            parameters,
700            ConsensusProtocolConfig::for_testing(),
701            None, // No protocol keypair for observer node
702            network_keypair,
703            Arc::new(Clock::default()),
704            Arc::new(txn_verifier),
705            commit_consumer,
706            registry,
707            0,
708        )
709        .await;
710
711        sleep(Duration::from_secs(2)).await;
712
713        // Observer nodes have own_index set to MAX as a special value
714        assert_eq!(observer.context().own_index, AuthorityIndex::MAX);
715        assert_eq!(observer.context().committee.epoch(), 0);
716        assert_eq!(observer.context().committee.size(), 1);
717        assert!(!observer.context().is_validator());
718
719        observer.stop().await;
720    }
721
722    // TODO: build AuthorityFixture.
723    // Spins up a committee of authorities and an observer node that connects to authority 0.
724    // Verifies that the network is progressing, advancing rounds and commits. It also verifies
725    // that the Observer node is receiving blocks from the network.
726    #[rstest]
727    #[tokio::test(flavor = "current_thread")]
728    async fn test_authority_committee(
729        #[values(NetworkType::Tonic)] network_type: NetworkType,
730        #[values(5, 10)] gc_depth: u32,
731    ) {
732        telemetry_subscribers::init_for_testing();
733        let db_registry = Registry::new();
734        DBMetrics::init(RegistryService::new(db_registry));
735
736        const NUM_OF_AUTHORITIES: usize = 4;
737        let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
738        let mut protocol_config = ConsensusProtocolConfig::for_testing();
739        protocol_config.set_gc_depth_for_testing(gc_depth);
740
741        let temp_dirs = (0..NUM_OF_AUTHORITIES)
742            .map(|_| TempDir::new().unwrap())
743            .collect::<Vec<_>>();
744
745        let mut commit_receivers = Vec::with_capacity(committee.size());
746        let mut authorities = Vec::with_capacity(committee.size());
747        let mut boot_counters = [0; NUM_OF_AUTHORITIES];
748
749        // Use a unique port based on gc_depth to avoid conflicts between parallel tests
750        let observer_server_port = 8900 + gc_depth as u16;
751
752        // Create authorities with observer server enabled for authority 0
753        let mut authority_0_network_key = None;
754        for (index, authority_info) in committee.authorities() {
755            let (authority, commit_receiver) = if index.value() == 0 {
756                // Save authority 0's network key for Observer connection
757                authority_0_network_key = Some(authority_info.network_key.clone());
758                // Enable observer server for authority 0
759                make_authority_with_observer_server(
760                    index,
761                    &temp_dirs[index.value()],
762                    committee.clone(),
763                    keypairs.clone(),
764                    network_type,
765                    boot_counters[index],
766                    protocol_config.clone(),
767                    Some(observer_server_port),
768                )
769                .await
770            } else {
771                make_authority(
772                    index,
773                    &temp_dirs[index.value()],
774                    committee.clone(),
775                    keypairs.clone(),
776                    network_type,
777                    boot_counters[index],
778                    protocol_config.clone(),
779                )
780                .await
781            };
782            boot_counters[index] += 1;
783            commit_receivers.push(commit_receiver);
784            authorities.push(authority);
785        }
786
787        // Create an Observer node that connects to authority 0
788        let observer_temp_dir = TempDir::new().unwrap();
789        let mut rng = StdRng::from_seed([99; 32]);
790        let observer_network_keypair = consensus_config::NetworkKeyPair::generate(&mut rng);
791
792        let observer_parameters = Parameters {
793            db_path: observer_temp_dir.path().to_path_buf(),
794            observer: ObserverParameters {
795                // Configure Observer to connect to authority 0
796                peers: vec![PeerRecord {
797                    public_key: authority_0_network_key
798                        .clone()
799                        .expect("Authority 0 network key should be set"),
800                    address: format!("/ip4/127.0.0.1/udp/{}", observer_server_port)
801                        .parse()
802                        .unwrap(),
803                }],
804                ..Default::default()
805            },
806            ..Default::default()
807        };
808
809        let (observer_commit_consumer, observer_commit_receiver) = CommitConsumerArgs::new(0, 0);
810        let observer = ConsensusAuthority::start(
811            network_type,
812            0,
813            committee.clone(),
814            observer_parameters,
815            protocol_config.clone(),
816            None, // No protocol keypair for observer
817            observer_network_keypair,
818            Arc::new(Clock::default()),
819            Arc::new(NoopTransactionVerifier {}),
820            observer_commit_consumer,
821            Registry::new(),
822            0,
823        )
824        .await;
825        // The relevant endpoints are now implemented for the synchronizer and commit_syncer components, so the Observer node should be able to catch up and
826        // fetch blocks beyond the latest ones that are fetched from the stream.
827        commit_receivers.push(observer_commit_receiver);
828
829        // Give Observer more time to connect and sync
830        sleep(Duration::from_secs(5)).await;
831
832        const NUM_TRANSACTIONS: u8 = 15;
833        let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
834        for i in 0..NUM_TRANSACTIONS {
835            let txn = vec![i; 16];
836            submitted_transactions.insert(txn.clone());
837            authorities[i as usize % authorities.len()]
838                .transaction_client()
839                .submit(vec![txn])
840                .await
841                .unwrap();
842        }
843
844        for receiver in &mut commit_receivers {
845            let mut expected_transactions = submitted_transactions.clone();
846            loop {
847                let committed_subdag =
848                    tokio::time::timeout(Duration::from_secs(1), receiver.recv())
849                        .await
850                        .unwrap()
851                        .unwrap();
852                for b in committed_subdag.blocks {
853                    for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
854                        assert!(
855                            expected_transactions.remove(&txn),
856                            "Transaction not submitted or already seen: {:?}",
857                            txn
858                        );
859                    }
860                }
861                if expected_transactions.is_empty() {
862                    break;
863                }
864            }
865        }
866
867        // Stop authority 1.
868        let index = committee.to_authority_index(1).unwrap();
869        authorities.remove(index.value()).stop().await;
870        sleep(Duration::from_secs(10)).await;
871
872        // Restart authority 1 and let it run.
873        let (authority, commit_receiver) = make_authority(
874            index,
875            &temp_dirs[index.value()],
876            committee.clone(),
877            keypairs.clone(),
878            network_type,
879            boot_counters[index],
880            protocol_config.clone(),
881        )
882        .await;
883        boot_counters[index] += 1;
884        commit_receivers[index] = commit_receiver;
885        authorities.insert(index.value(), authority);
886        sleep(Duration::from_secs(10)).await;
887
888        // Verify that the Observer node is running
889        // TODO: The actual block processing for observers is not fully implemented yet
890        // for now we just verify that blocks are received and the number of received blocks is not far from
891        // the number of blocks sent by authority 0.
892        let observer_context = observer.context();
893        assert!(
894            observer_context.is_observer(),
895            "It should be an observer node"
896        );
897
898        // Get the total verified_blocks from authority 0 (sum across all sending authorities)
899        let authority_0 = &authorities[0];
900        let authority_0_context = authority_0.context();
901        let mut authority_0_total_verified_blocks = 0;
902
903        // Sum verified_blocks from all authorities as seen by authority 0
904        for (_, authority_info) in committee.authorities() {
905            if let Ok(metric) = authority_0_context
906                .metrics
907                .node_metrics
908                .verified_blocks
909                .get_metric_with_label_values(&[&authority_info.hostname])
910            {
911                authority_0_total_verified_blocks += metric.get();
912                println!(
913                    "authority_info.hostname: {}, metric: {:?}",
914                    authority_info.hostname, authority_0_total_verified_blocks
915                );
916            }
917        }
918
919        let mut authority_0_total_proposed_blocks = 0;
920        for force in [true, false] {
921            if let Ok(metric) = authority_0_context
922                .metrics
923                .node_metrics
924                .proposed_blocks
925                .get_metric_with_label_values(&[&force.to_string()])
926            {
927                authority_0_total_proposed_blocks += metric.get();
928            }
929        }
930
931        authority_0_total_verified_blocks += authority_0_total_proposed_blocks;
932
933        // Sum verified_blocks from all authorities as seen by the observer
934        let mut observer_received_blocks = 0;
935        for (_, authority_info) in committee.authorities() {
936            if let Ok(metric) = observer_context
937                .metrics
938                .node_metrics
939                .verified_blocks
940                .get_metric_with_label_values(&[&authority_info.hostname])
941            {
942                observer_received_blocks += metric.get();
943            }
944        }
945
946        // Compare the values - they should be related but might not be exactly equal
947        // due to timing and the observer connecting mid-stream
948        assert!(
949            observer_received_blocks > 0,
950            "Observer should have received at least some blocks, got: {}",
951            observer_received_blocks
952        );
953
954        println!(
955            "authority_0_total_verified_blocks: {}, observer_received_blocks: {}",
956            authority_0_total_verified_blocks, observer_received_blocks
957        );
958
959        const TOLERANCE: u64 = 20;
960        assert!(
961            authority_0_total_verified_blocks - observer_received_blocks <= TOLERANCE,
962            "The number of blocks received by the observer ({}) should be close to the number of blocks verified by authority 0 ({})",
963            observer_received_blocks,
964            authority_0_total_verified_blocks,
965        );
966
967        // Stop observer first
968        observer.stop().await;
969
970        // Stop all authorities and exit.
971        for authority in authorities {
972            authority.stop().await;
973        }
974    }
975
976    #[rstest]
977    #[tokio::test(flavor = "current_thread")]
978    async fn test_small_committee(
979        #[values(NetworkType::Tonic)] network_type: NetworkType,
980        #[values(1, 2, 3)] num_authorities: usize,
981    ) {
982        telemetry_subscribers::init_for_testing();
983        let db_registry = Registry::new();
984        DBMetrics::init(RegistryService::new(db_registry));
985
986        let (committee, keypairs) = local_committee_and_keys(0, vec![1; num_authorities]);
987        let protocol_config = ConsensusProtocolConfig::for_testing();
988
989        let temp_dirs = (0..num_authorities)
990            .map(|_| TempDir::new().unwrap())
991            .collect::<Vec<_>>();
992
993        let mut output_receivers = Vec::with_capacity(committee.size());
994        let mut authorities: Vec<ConsensusAuthority> = Vec::with_capacity(committee.size());
995        let mut boot_counters = vec![0; num_authorities];
996
997        for (index, _authority_info) in committee.authorities() {
998            let (authority, commit_receiver) = make_authority(
999                index,
1000                &temp_dirs[index.value()],
1001                committee.clone(),
1002                keypairs.clone(),
1003                network_type,
1004                boot_counters[index],
1005                protocol_config.clone(),
1006            )
1007            .await;
1008            boot_counters[index] += 1;
1009            output_receivers.push(commit_receiver);
1010            authorities.push(authority);
1011        }
1012
1013        const NUM_TRANSACTIONS: u8 = 15;
1014        let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
1015        for i in 0..NUM_TRANSACTIONS {
1016            let txn = vec![i; 16];
1017            submitted_transactions.insert(txn.clone());
1018            authorities[i as usize % authorities.len()]
1019                .transaction_client()
1020                .submit(vec![txn])
1021                .await
1022                .unwrap();
1023        }
1024
1025        for receiver in &mut output_receivers {
1026            let mut expected_transactions = submitted_transactions.clone();
1027            loop {
1028                let committed_subdag =
1029                    tokio::time::timeout(Duration::from_secs(1), receiver.recv())
1030                        .await
1031                        .unwrap()
1032                        .unwrap();
1033                for b in committed_subdag.blocks {
1034                    for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
1035                        assert!(
1036                            expected_transactions.remove(&txn),
1037                            "Transaction not submitted or already seen: {:?}",
1038                            txn
1039                        );
1040                    }
1041                }
1042                if expected_transactions.is_empty() {
1043                    break;
1044                }
1045            }
1046        }
1047
1048        // Stop authority 0.
1049        let index = committee.to_authority_index(0).unwrap();
1050        authorities.remove(index.value()).stop().await;
1051        sleep(Duration::from_secs(10)).await;
1052
1053        // Restart authority 0 and let it run.
1054        let (authority, commit_receiver) = make_authority(
1055            index,
1056            &temp_dirs[index.value()],
1057            committee.clone(),
1058            keypairs.clone(),
1059            network_type,
1060            boot_counters[index],
1061            protocol_config.clone(),
1062        )
1063        .await;
1064        boot_counters[index] += 1;
1065        output_receivers[index] = commit_receiver;
1066        authorities.insert(index.value(), authority);
1067        sleep(Duration::from_secs(10)).await;
1068
1069        // Stop all authorities and exit.
1070        for authority in authorities {
1071            authority.stop().await;
1072        }
1073    }
1074
1075    #[rstest]
1076    #[tokio::test(flavor = "current_thread")]
1077    async fn test_amnesia_recovery_success(#[values(5, 10)] gc_depth: u32) {
1078        telemetry_subscribers::init_for_testing();
1079        let db_registry = Registry::new();
1080        DBMetrics::init(RegistryService::new(db_registry));
1081
1082        const NUM_OF_AUTHORITIES: usize = 4;
1083        let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
1084        let mut commit_receivers = vec![];
1085        let mut authorities = BTreeMap::new();
1086        let mut temp_dirs = BTreeMap::new();
1087        let mut boot_counters = [0; NUM_OF_AUTHORITIES];
1088
1089        let mut protocol_config = ConsensusProtocolConfig::for_testing();
1090        protocol_config.set_gc_depth_for_testing(gc_depth);
1091
1092        for (index, _authority_info) in committee.authorities() {
1093            let dir = TempDir::new().unwrap();
1094            let (authority, commit_receiver) = make_authority(
1095                index,
1096                &dir,
1097                committee.clone(),
1098                keypairs.clone(),
1099                NetworkType::Tonic,
1100                boot_counters[index],
1101                protocol_config.clone(),
1102            )
1103            .await;
1104            boot_counters[index] += 1;
1105            commit_receivers.push(commit_receiver);
1106            authorities.insert(index, authority);
1107            temp_dirs.insert(index, dir);
1108        }
1109
1110        // Now we take the receiver of authority 1 and we wait until we see at least one block committed from this authority
1111        // We wait until we see at least one committed block authored from this authority. That way we'll be 100% sure that
1112        // at least one block has been proposed and successfully received by a quorum of nodes.
1113        let index_1 = committee.to_authority_index(1).unwrap();
1114        'outer: while let Some(result) =
1115            timeout(Duration::from_secs(10), commit_receivers[index_1].recv())
1116                .await
1117                .expect("Timed out while waiting for at least one committed block from authority 1")
1118        {
1119            for block in result.blocks {
1120                if block.round() > GENESIS_ROUND && block.author() == index_1 {
1121                    break 'outer;
1122                }
1123            }
1124        }
1125
1126        // Stop authority 1 & 2.
1127        // * Authority 1 will be used to wipe out their DB and practically "force" the amnesia recovery.
1128        // * Authority 2 is stopped in order to simulate less than f+1 availability which will
1129        // make authority 1 retry during amnesia recovery until it has finally managed to successfully get back f+1 responses.
1130        // once authority 2 is up and running again.
1131        authorities.remove(&index_1).unwrap().stop().await;
1132        let index_2 = committee.to_authority_index(2).unwrap();
1133        authorities.remove(&index_2).unwrap().stop().await;
1134        sleep(Duration::from_secs(5)).await;
1135
1136        // Authority 1: create a new directory to simulate amnesia. The node will start having participated previously
1137        // to consensus but now will attempt to synchronize the last own block and recover from there. It won't be able
1138        // to do that successfully as authority 2 is still down.
1139        let dir = TempDir::new().unwrap();
1140        // We do reset the boot counter for this one to simulate a "binary" restart
1141        boot_counters[index_1] = 0;
1142        let (authority, mut commit_receiver) = make_authority(
1143            index_1,
1144            &dir,
1145            committee.clone(),
1146            keypairs.clone(),
1147            NetworkType::Tonic,
1148            boot_counters[index_1],
1149            protocol_config.clone(),
1150        )
1151        .await;
1152        boot_counters[index_1] += 1;
1153        authorities.insert(index_1, authority);
1154        temp_dirs.insert(index_1, dir);
1155        sleep(Duration::from_secs(5)).await;
1156
1157        // Now spin up authority 2 using its earlier directly - so no amnesia recovery should be forced here.
1158        // Authority 1 should be able to recover from amnesia successfully.
1159        let (authority, _commit_receiver) = make_authority(
1160            index_2,
1161            &temp_dirs[&index_2],
1162            committee.clone(),
1163            keypairs,
1164            NetworkType::Tonic,
1165            boot_counters[index_2],
1166            protocol_config.clone(),
1167        )
1168        .await;
1169        boot_counters[index_2] += 1;
1170        authorities.insert(index_2, authority);
1171        sleep(Duration::from_secs(5)).await;
1172
1173        // We wait until we see at least one committed block authored from this authority
1174        'outer: while let Some(result) = commit_receiver.recv().await {
1175            for block in result.blocks {
1176                if block.round() > GENESIS_ROUND && block.author() == index_1 {
1177                    break 'outer;
1178                }
1179            }
1180        }
1181
1182        // Stop all authorities and exit.
1183        for (_, authority) in authorities {
1184            authority.stop().await;
1185        }
1186    }
1187
1188    // TODO: create a fixture
1189    async fn make_authority(
1190        index: AuthorityIndex,
1191        db_dir: &TempDir,
1192        committee: Committee,
1193        keypairs: Vec<(NetworkKeyPair, ProtocolKeyPair)>,
1194        network_type: NetworkType,
1195        boot_counter: u64,
1196        protocol_config: ConsensusProtocolConfig,
1197    ) -> (ConsensusAuthority, UnboundedReceiver<CommittedSubDag>) {
1198        make_authority_with_observer_server(
1199            index,
1200            db_dir,
1201            committee,
1202            keypairs,
1203            network_type,
1204            boot_counter,
1205            protocol_config,
1206            None, // No observer server port
1207        )
1208        .await
1209    }
1210
1211    async fn make_authority_with_observer_server(
1212        index: AuthorityIndex,
1213        db_dir: &TempDir,
1214        committee: Committee,
1215        keypairs: Vec<(NetworkKeyPair, ProtocolKeyPair)>,
1216        network_type: NetworkType,
1217        boot_counter: u64,
1218        protocol_config: ConsensusProtocolConfig,
1219        observer_server_port: Option<u16>,
1220    ) -> (ConsensusAuthority, UnboundedReceiver<CommittedSubDag>) {
1221        let registry = Registry::new();
1222
1223        // Cache less blocks to exercise commit sync.
1224        let mut parameters = Parameters {
1225            db_path: db_dir.path().to_path_buf(),
1226            dag_state_cached_rounds: 5,
1227            commit_sync_parallel_fetches: 2,
1228            commit_sync_batch_size: 3,
1229            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
1230            ..Default::default()
1231        };
1232
1233        // Enable observer server if port is provided
1234        if let Some(port) = observer_server_port {
1235            parameters.observer.server_port = Some(port);
1236        }
1237
1238        let txn_verifier = NoopTransactionVerifier {};
1239
1240        let protocol_keypair = keypairs[index].1.clone();
1241        let network_keypair = keypairs[index].0.clone();
1242
1243        let (commit_consumer, commit_receiver) = CommitConsumerArgs::new(0, 0);
1244
1245        let authority = ConsensusAuthority::start(
1246            network_type,
1247            0,
1248            committee,
1249            parameters,
1250            protocol_config,
1251            Some(protocol_keypair),
1252            network_keypair,
1253            Arc::new(Clock::default()),
1254            Arc::new(txn_verifier),
1255            commit_consumer,
1256            registry,
1257            boot_counter,
1258        )
1259        .await;
1260
1261        (authority, commit_receiver)
1262    }
1263}