consensus_core/
authority_node.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{sync::Arc, time::Instant};
5
6use consensus_config::{
7    AuthorityIndex, ChainType, Committee, ConsensusProtocolConfig, NetworkKeyPair,
8    NetworkPublicKey, Parameters, ProtocolKeyPair,
9};
10use consensus_types::block::Round;
11use itertools::Itertools;
12use mysten_network::Multiaddr;
13use parking_lot::RwLock;
14use prometheus::Registry;
15use tracing::{info, warn};
16
17use crate::{
18    BlockAPI as _, CommitConsumerArgs,
19    authority_service::AuthorityService,
20    block_manager::BlockManager,
21    block_verifier::SignedBlockVerifier,
22    commit_observer::CommitObserver,
23    commit_syncer::{CommitSyncer, CommitSyncerHandle},
24    commit_vote_monitor::CommitVoteMonitor,
25    context::{Clock, Context},
26    core::{Core, CoreSignals},
27    core_thread::{ChannelCoreThreadDispatcher, CoreThreadHandle},
28    dag_state::DagState,
29    leader_schedule::LeaderSchedule,
30    leader_timeout::{LeaderTimeoutTask, LeaderTimeoutTaskHandle},
31    metrics::initialise_metrics,
32    network::{
33        CommitSyncerClient, NetworkManager, SynchronizerClient, tonic_network::TonicManager,
34    },
35    observer_service::ObserverService,
36    round_prober::{RoundProber, RoundProberHandle},
37    round_tracker::RoundTracker,
38    storage::rocksdb_store::RocksDBStore,
39    subscriber::Subscriber,
40    synchronizer::{Synchronizer, SynchronizerHandle},
41    transaction::{TransactionClient, TransactionConsumer, TransactionVerifier},
42    transaction_vote_tracker::TransactionVoteTracker,
43};
44
45/// ConsensusAuthority is used by Sui to manage the lifetime of AuthorityNode.
46/// It hides the details of the implementation from the caller, MysticetiManager.
47#[allow(private_interfaces)]
48pub enum ConsensusAuthority {
49    WithTonic(AuthorityNode<TonicManager>),
50}
51
52impl ConsensusAuthority {
53    pub async fn start(
54        network_type: NetworkType,
55        epoch_start_timestamp_ms: u64,
56        own_index: AuthorityIndex,
57        committee: Committee,
58        parameters: Parameters,
59        protocol_config: ConsensusProtocolConfig,
60        protocol_keypair: ProtocolKeyPair,
61        network_keypair: NetworkKeyPair,
62        clock: Arc<Clock>,
63        transaction_verifier: Arc<dyn TransactionVerifier>,
64        commit_consumer: CommitConsumerArgs,
65        registry: Registry,
66        // A counter that keeps track of how many times the consensus authority has been booted while the process
67        // has been running. It's useful for making decisions on whether amnesia recovery should run.
68        // When `boot_counter` is 0, `ConsensusAuthority` will initiate the process of amnesia recovery if that's enabled in the parameters.
69        boot_counter: u64,
70    ) -> Self {
71        match network_type {
72            NetworkType::Tonic => {
73                let authority = AuthorityNode::start(
74                    epoch_start_timestamp_ms,
75                    own_index,
76                    committee,
77                    parameters,
78                    protocol_config,
79                    protocol_keypair,
80                    network_keypair,
81                    clock,
82                    transaction_verifier,
83                    commit_consumer,
84                    registry,
85                    boot_counter,
86                )
87                .await;
88                Self::WithTonic(authority)
89            }
90        }
91    }
92
93    pub async fn stop(self) {
94        match self {
95            Self::WithTonic(authority) => authority.stop().await,
96        }
97    }
98
99    pub fn update_peer_address(
100        &self,
101        network_pubkey: NetworkPublicKey,
102        address: Option<Multiaddr>,
103    ) {
104        match self {
105            Self::WithTonic(authority) => authority.update_peer_address(network_pubkey, address),
106        }
107    }
108
109    pub fn transaction_client(&self) -> Arc<TransactionClient> {
110        match self {
111            Self::WithTonic(authority) => authority.transaction_client(),
112        }
113    }
114
115    #[cfg(test)]
116    fn context(&self) -> &Arc<Context> {
117        match self {
118            Self::WithTonic(authority) => &authority.context,
119        }
120    }
121}
122
123#[derive(Clone, Copy, PartialEq, Eq, Debug)]
124pub enum NetworkType {
125    Tonic,
126}
127
128pub(crate) struct AuthorityNode<N>
129where
130    N: NetworkManager,
131{
132    context: Arc<Context>,
133    start_time: Instant,
134    transaction_client: Arc<TransactionClient>,
135    synchronizer: Arc<SynchronizerHandle>,
136
137    commit_syncer_handle: CommitSyncerHandle,
138    round_prober_handle: RoundProberHandle,
139    leader_timeout_handle: LeaderTimeoutTaskHandle,
140    core_thread_handle: CoreThreadHandle,
141    subscriber: Subscriber<N::ValidatorClient, AuthorityService<ChannelCoreThreadDispatcher>>,
142    network_manager: N,
143}
144
145impl<N> AuthorityNode<N>
146where
147    N: NetworkManager,
148{
149    // See comments above ConsensusAuthority::start() for details on the input.
150    pub(crate) async fn start(
151        epoch_start_timestamp_ms: u64,
152        own_index: AuthorityIndex,
153        committee: Committee,
154        parameters: Parameters,
155        protocol_config: ConsensusProtocolConfig,
156        protocol_keypair: ProtocolKeyPair,
157        network_keypair: NetworkKeyPair,
158        clock: Arc<Clock>,
159        transaction_verifier: Arc<dyn TransactionVerifier>,
160        commit_consumer: CommitConsumerArgs,
161        registry: Registry,
162        boot_counter: u64,
163    ) -> Self {
164        assert!(
165            committee.is_valid_index(own_index),
166            "Invalid own index {}",
167            own_index
168        );
169        let own_hostname = committee.authority(own_index).hostname.clone();
170        info!(
171            "Starting consensus authority {} {}, {:?}, epoch start timestamp {}, boot counter {}, replaying after commit index {}, consumer last processed commit index {}",
172            own_index,
173            own_hostname,
174            protocol_config.protocol_version(),
175            epoch_start_timestamp_ms,
176            boot_counter,
177            commit_consumer.replay_after_commit_index,
178            commit_consumer.consumer_last_processed_commit_index
179        );
180        info!(
181            "Consensus authorities: {}",
182            committee
183                .authorities()
184                .map(|(i, a)| format!("{}: {}", i, a.hostname))
185                .join(", ")
186        );
187        info!("Consensus parameters: {:?}", parameters);
188        info!("Consensus committee: {:?}", committee);
189        let context = Arc::new(Context::new(
190            epoch_start_timestamp_ms,
191            own_index,
192            committee,
193            parameters,
194            protocol_config,
195            initialise_metrics(registry),
196            clock,
197        ));
198        let start_time = Instant::now();
199
200        context
201            .metrics
202            .node_metrics
203            .authority_index
204            .with_label_values(&[&own_hostname])
205            .set(context.own_index.value() as i64);
206        context
207            .metrics
208            .node_metrics
209            .protocol_version
210            .set(context.protocol_config.protocol_version() as i64);
211
212        let (tx_client, tx_receiver) = TransactionClient::new(context.clone());
213        let tx_consumer = TransactionConsumer::new(tx_receiver, context.clone());
214
215        let (core_signals, signals_receivers) = CoreSignals::new(context.clone());
216
217        let mut network_manager = N::new(context.clone(), network_keypair);
218        let validator_client = network_manager.validator_client();
219
220        let synchronizer_client = Arc::new(SynchronizerClient::<
221            N::ValidatorClient,
222            N::ObserverClient,
223        >::new(
224            context.clone(),
225            Some(validator_client.clone()),
226            None, // TODO: set observer client if want to talk to a peer's observer server.
227        ));
228        let commit_syncer_client = Arc::new(CommitSyncerClient::<
229            N::ValidatorClient,
230            N::ObserverClient,
231        >::new(
232            context.clone(),
233            Some(validator_client.clone()),
234            None, // TODO: set observer client if want to talk to a peer's observer server.
235        ));
236
237        let store_path = context.parameters.db_path.as_path().to_str().unwrap();
238        let use_fifo_compaction = context.parameters.use_fifo_compaction
239            && (context.protocol_config.chain() != ChainType::Mainnet
240                || context.own_index.value().is_multiple_of(10));
241        let store = Arc::new(RocksDBStore::new(store_path, use_fifo_compaction));
242        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
243
244        let block_verifier = Arc::new(SignedBlockVerifier::new(
245            context.clone(),
246            transaction_verifier,
247        ));
248
249        let transaction_vote_tracker =
250            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
251
252        let sync_last_known_own_block = boot_counter == 0
253            && !context
254                .parameters
255                .sync_last_known_own_block_timeout
256                .is_zero();
257        info!(
258            "Sync last known own block: {}. Boot count: {}. Timeout: {:?}.",
259            sync_last_known_own_block,
260            boot_counter,
261            context.parameters.sync_last_known_own_block_timeout
262        );
263
264        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
265
266        let leader_schedule = Arc::new(LeaderSchedule::from_store(
267            context.clone(),
268            dag_state.clone(),
269        ));
270
271        let commit_consumer_monitor = commit_consumer.monitor();
272        let commit_observer = CommitObserver::new(
273            context.clone(),
274            commit_consumer,
275            dag_state.clone(),
276            transaction_vote_tracker.clone(),
277            leader_schedule.clone(),
278        )
279        .await;
280
281        let initial_received_rounds = dag_state
282            .read()
283            .get_last_cached_block_per_authority(Round::MAX)
284            .into_iter()
285            .map(|(block, _)| block.round())
286            .collect::<Vec<_>>();
287        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(
288            context.clone(),
289            initial_received_rounds,
290        )));
291
292        // To avoid accidentally leaking the private key, the protocol key pair should only be
293        // kept in Core.
294        let core = Core::new(
295            context.clone(),
296            leader_schedule,
297            tx_consumer,
298            transaction_vote_tracker.clone(),
299            block_manager,
300            commit_observer,
301            core_signals,
302            protocol_keypair,
303            dag_state.clone(),
304            sync_last_known_own_block,
305            round_tracker.clone(),
306        );
307
308        let (core_dispatcher, core_thread_handle) =
309            ChannelCoreThreadDispatcher::start(context.clone(), &dag_state, core);
310        let core_dispatcher = Arc::new(core_dispatcher);
311        let leader_timeout_handle =
312            LeaderTimeoutTask::start(core_dispatcher.clone(), &signals_receivers, context.clone());
313
314        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
315
316        let synchronizer = Synchronizer::start(
317            synchronizer_client.clone(),
318            context.clone(),
319            core_dispatcher.clone(),
320            commit_vote_monitor.clone(),
321            block_verifier.clone(),
322            transaction_vote_tracker.clone(),
323            round_tracker.clone(),
324            dag_state.clone(),
325            sync_last_known_own_block,
326        );
327
328        let commit_syncer_handle = CommitSyncer::new(
329            context.clone(),
330            core_dispatcher.clone(),
331            commit_vote_monitor.clone(),
332            commit_consumer_monitor.clone(),
333            block_verifier.clone(),
334            transaction_vote_tracker.clone(),
335            round_tracker.clone(),
336            commit_syncer_client.clone(),
337            dag_state.clone(),
338        )
339        .start();
340
341        let round_prober_handle = RoundProber::new(
342            context.clone(),
343            core_dispatcher.clone(),
344            round_tracker.clone(),
345            dag_state.clone(),
346            validator_client.clone(),
347        )
348        .start();
349
350        let network_service = Arc::new(AuthorityService::new(
351            context.clone(),
352            block_verifier,
353            commit_vote_monitor,
354            round_tracker.clone(),
355            synchronizer.clone(),
356            core_dispatcher,
357            signals_receivers.block_broadcast_receiver(),
358            transaction_vote_tracker,
359            dag_state.clone(),
360            store.clone(),
361        ));
362
363        let subscriber = {
364            let s = Subscriber::new(
365                context.clone(),
366                validator_client,
367                network_service.clone(),
368                dag_state.clone(),
369            );
370            for (peer, _) in context.committee.authorities() {
371                if peer != context.own_index {
372                    s.subscribe(peer);
373                }
374            }
375            s
376        };
377
378        network_manager
379            .start_validator_server(network_service.clone())
380            .await;
381        if context.parameters.tonic.is_observer_server_enabled() {
382            let observer_service = Arc::new(ObserverService::new(
383                context.clone(),
384                dag_state.clone(),
385                signals_receivers.accepted_block_broadcast_receiver(),
386            ));
387            network_manager
388                .start_observer_server(observer_service)
389                .await;
390        }
391
392        info!(
393            "Consensus authority started, took {:?}",
394            start_time.elapsed()
395        );
396
397        Self {
398            context,
399            start_time,
400            transaction_client: Arc::new(tx_client),
401            synchronizer,
402            commit_syncer_handle,
403            round_prober_handle,
404            leader_timeout_handle,
405            core_thread_handle,
406            subscriber,
407            network_manager,
408        }
409    }
410
411    pub(crate) async fn stop(mut self) {
412        info!(
413            "Stopping authority. Total run time: {:?}",
414            self.start_time.elapsed()
415        );
416
417        // First shutdown components calling into Core.
418        if let Err(e) = self.synchronizer.stop().await {
419            if e.is_panic() {
420                std::panic::resume_unwind(e.into_panic());
421            }
422            warn!(
423                "Failed to stop synchronizer when shutting down consensus: {:?}",
424                e
425            );
426        };
427        self.commit_syncer_handle.stop().await;
428        self.round_prober_handle.stop().await;
429        self.leader_timeout_handle.stop().await;
430        // Shutdown Core to stop block productions and broadcast.
431        self.core_thread_handle.stop().await;
432        // Stop block subscriptions before stopping network server.
433        self.subscriber.stop();
434        self.network_manager.stop().await;
435
436        self.context
437            .metrics
438            .node_metrics
439            .uptime
440            .observe(self.start_time.elapsed().as_secs_f64());
441    }
442
443    pub(crate) fn transaction_client(&self) -> Arc<TransactionClient> {
444        self.transaction_client.clone()
445    }
446
447    pub(crate) fn update_peer_address(
448        &self,
449        network_pubkey: NetworkPublicKey,
450        address: Option<Multiaddr>,
451    ) {
452        // Find the peer index for this network key
453        let Some(peer) = self
454            .context
455            .committee
456            .authorities()
457            .find(|(_, authority)| authority.network_key == network_pubkey)
458            .map(|(index, _)| index)
459        else {
460            warn!(
461                "Network public key {:?} not found in committee, ignoring address update",
462                network_pubkey
463            );
464            return;
465        };
466
467        // Update the address in the network manager
468        self.network_manager.update_peer_address(peer, address);
469
470        // Re-subscribe to the peer to force reconnection with new address
471        if peer != self.context.own_index {
472            info!("Re-subscribing to peer {} after address update", peer);
473            self.subscriber.subscribe(peer);
474        }
475    }
476}
477
478#[cfg(test)]
479mod tests {
480    #![allow(non_snake_case)]
481
482    use std::{
483        collections::{BTreeMap, BTreeSet},
484        sync::Arc,
485        time::Duration,
486    };
487
488    use consensus_config::{Parameters, local_committee_and_keys};
489    use mysten_metrics::RegistryService;
490    use mysten_metrics::monitored_mpsc::UnboundedReceiver;
491    use prometheus::Registry;
492    use rstest::rstest;
493    use tempfile::TempDir;
494    use tokio::time::{sleep, timeout};
495    use typed_store::DBMetrics;
496
497    use super::*;
498    use crate::{
499        CommittedSubDag,
500        block::{BlockAPI as _, GENESIS_ROUND},
501        transaction::NoopTransactionVerifier,
502    };
503
504    #[rstest]
505    #[tokio::test]
506    async fn test_authority_start_and_stop(
507        #[values(NetworkType::Tonic)] network_type: NetworkType,
508    ) {
509        let (committee, keypairs) = local_committee_and_keys(0, vec![1]);
510        let registry = Registry::new();
511
512        let temp_dir = TempDir::new().unwrap();
513        let parameters = Parameters {
514            db_path: temp_dir.keep(),
515            ..Default::default()
516        };
517        let txn_verifier = NoopTransactionVerifier {};
518
519        let own_index = committee.to_authority_index(0).unwrap();
520        let protocol_keypair = keypairs[own_index].1.clone();
521        let network_keypair = keypairs[own_index].0.clone();
522
523        let (commit_consumer, _) = CommitConsumerArgs::new(0, 0);
524
525        let authority = ConsensusAuthority::start(
526            network_type,
527            0,
528            own_index,
529            committee,
530            parameters,
531            ConsensusProtocolConfig::for_testing(),
532            protocol_keypair,
533            network_keypair,
534            Arc::new(Clock::default()),
535            Arc::new(txn_verifier),
536            commit_consumer,
537            registry,
538            0,
539        )
540        .await;
541
542        assert_eq!(authority.context().own_index, own_index);
543        assert_eq!(authority.context().committee.epoch(), 0);
544        assert_eq!(authority.context().committee.size(), 1);
545
546        authority.stop().await;
547    }
548
549    // TODO: build AuthorityFixture.
550    #[rstest]
551    #[tokio::test(flavor = "current_thread")]
552    async fn test_authority_committee(
553        #[values(NetworkType::Tonic)] network_type: NetworkType,
554        #[values(5, 10)] gc_depth: u32,
555    ) {
556        telemetry_subscribers::init_for_testing();
557        let db_registry = Registry::new();
558        DBMetrics::init(RegistryService::new(db_registry));
559
560        const NUM_OF_AUTHORITIES: usize = 4;
561        let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
562        let mut protocol_config = ConsensusProtocolConfig::for_testing();
563        protocol_config.set_gc_depth_for_testing(gc_depth);
564
565        let temp_dirs = (0..NUM_OF_AUTHORITIES)
566            .map(|_| TempDir::new().unwrap())
567            .collect::<Vec<_>>();
568
569        let mut commit_receivers = Vec::with_capacity(committee.size());
570        let mut authorities = Vec::with_capacity(committee.size());
571        let mut boot_counters = [0; NUM_OF_AUTHORITIES];
572
573        for (index, _authority_info) in committee.authorities() {
574            let (authority, commit_receiver) = make_authority(
575                index,
576                &temp_dirs[index.value()],
577                committee.clone(),
578                keypairs.clone(),
579                network_type,
580                boot_counters[index],
581                protocol_config.clone(),
582            )
583            .await;
584            boot_counters[index] += 1;
585            commit_receivers.push(commit_receiver);
586            authorities.push(authority);
587        }
588
589        const NUM_TRANSACTIONS: u8 = 15;
590        let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
591        for i in 0..NUM_TRANSACTIONS {
592            let txn = vec![i; 16];
593            submitted_transactions.insert(txn.clone());
594            authorities[i as usize % authorities.len()]
595                .transaction_client()
596                .submit(vec![txn])
597                .await
598                .unwrap();
599        }
600
601        for receiver in &mut commit_receivers {
602            let mut expected_transactions = submitted_transactions.clone();
603            loop {
604                let committed_subdag =
605                    tokio::time::timeout(Duration::from_secs(1), receiver.recv())
606                        .await
607                        .unwrap()
608                        .unwrap();
609                for b in committed_subdag.blocks {
610                    for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
611                        assert!(
612                            expected_transactions.remove(&txn),
613                            "Transaction not submitted or already seen: {:?}",
614                            txn
615                        );
616                    }
617                }
618                assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
619                if expected_transactions.is_empty() {
620                    break;
621                }
622            }
623        }
624
625        // Stop authority 1.
626        let index = committee.to_authority_index(1).unwrap();
627        authorities.remove(index.value()).stop().await;
628        sleep(Duration::from_secs(10)).await;
629
630        // Restart authority 1 and let it run.
631        let (authority, commit_receiver) = make_authority(
632            index,
633            &temp_dirs[index.value()],
634            committee.clone(),
635            keypairs.clone(),
636            network_type,
637            boot_counters[index],
638            protocol_config.clone(),
639        )
640        .await;
641        boot_counters[index] += 1;
642        commit_receivers[index] = commit_receiver;
643        authorities.insert(index.value(), authority);
644        sleep(Duration::from_secs(10)).await;
645
646        // Stop all authorities and exit.
647        for authority in authorities {
648            authority.stop().await;
649        }
650    }
651
652    #[rstest]
653    #[tokio::test(flavor = "current_thread")]
654    async fn test_small_committee(
655        #[values(NetworkType::Tonic)] network_type: NetworkType,
656        #[values(1, 2, 3)] num_authorities: usize,
657    ) {
658        telemetry_subscribers::init_for_testing();
659        let db_registry = Registry::new();
660        DBMetrics::init(RegistryService::new(db_registry));
661
662        let (committee, keypairs) = local_committee_and_keys(0, vec![1; num_authorities]);
663        let protocol_config = ConsensusProtocolConfig::for_testing();
664
665        let temp_dirs = (0..num_authorities)
666            .map(|_| TempDir::new().unwrap())
667            .collect::<Vec<_>>();
668
669        let mut output_receivers = Vec::with_capacity(committee.size());
670        let mut authorities: Vec<ConsensusAuthority> = Vec::with_capacity(committee.size());
671        let mut boot_counters = vec![0; num_authorities];
672
673        for (index, _authority_info) in committee.authorities() {
674            let (authority, commit_receiver) = make_authority(
675                index,
676                &temp_dirs[index.value()],
677                committee.clone(),
678                keypairs.clone(),
679                network_type,
680                boot_counters[index],
681                protocol_config.clone(),
682            )
683            .await;
684            boot_counters[index] += 1;
685            output_receivers.push(commit_receiver);
686            authorities.push(authority);
687        }
688
689        const NUM_TRANSACTIONS: u8 = 15;
690        let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
691        for i in 0..NUM_TRANSACTIONS {
692            let txn = vec![i; 16];
693            submitted_transactions.insert(txn.clone());
694            authorities[i as usize % authorities.len()]
695                .transaction_client()
696                .submit(vec![txn])
697                .await
698                .unwrap();
699        }
700
701        for receiver in &mut output_receivers {
702            let mut expected_transactions = submitted_transactions.clone();
703            loop {
704                let committed_subdag =
705                    tokio::time::timeout(Duration::from_secs(1), receiver.recv())
706                        .await
707                        .unwrap()
708                        .unwrap();
709                for b in committed_subdag.blocks {
710                    for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
711                        assert!(
712                            expected_transactions.remove(&txn),
713                            "Transaction not submitted or already seen: {:?}",
714                            txn
715                        );
716                    }
717                }
718                assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
719                if expected_transactions.is_empty() {
720                    break;
721                }
722            }
723        }
724
725        // Stop authority 0.
726        let index = committee.to_authority_index(0).unwrap();
727        authorities.remove(index.value()).stop().await;
728        sleep(Duration::from_secs(10)).await;
729
730        // Restart authority 0 and let it run.
731        let (authority, commit_receiver) = make_authority(
732            index,
733            &temp_dirs[index.value()],
734            committee.clone(),
735            keypairs.clone(),
736            network_type,
737            boot_counters[index],
738            protocol_config.clone(),
739        )
740        .await;
741        boot_counters[index] += 1;
742        output_receivers[index] = commit_receiver;
743        authorities.insert(index.value(), authority);
744        sleep(Duration::from_secs(10)).await;
745
746        // Stop all authorities and exit.
747        for authority in authorities {
748            authority.stop().await;
749        }
750    }
751
752    #[rstest]
753    #[tokio::test(flavor = "current_thread")]
754    async fn test_amnesia_recovery_success(#[values(5, 10)] gc_depth: u32) {
755        telemetry_subscribers::init_for_testing();
756        let db_registry = Registry::new();
757        DBMetrics::init(RegistryService::new(db_registry));
758
759        const NUM_OF_AUTHORITIES: usize = 4;
760        let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
761        let mut commit_receivers = vec![];
762        let mut authorities = BTreeMap::new();
763        let mut temp_dirs = BTreeMap::new();
764        let mut boot_counters = [0; NUM_OF_AUTHORITIES];
765
766        let mut protocol_config = ConsensusProtocolConfig::for_testing();
767        protocol_config.set_gc_depth_for_testing(gc_depth);
768
769        for (index, _authority_info) in committee.authorities() {
770            let dir = TempDir::new().unwrap();
771            let (authority, commit_receiver) = make_authority(
772                index,
773                &dir,
774                committee.clone(),
775                keypairs.clone(),
776                NetworkType::Tonic,
777                boot_counters[index],
778                protocol_config.clone(),
779            )
780            .await;
781            boot_counters[index] += 1;
782            commit_receivers.push(commit_receiver);
783            authorities.insert(index, authority);
784            temp_dirs.insert(index, dir);
785        }
786
787        // Now we take the receiver of authority 1 and we wait until we see at least one block committed from this authority
788        // We wait until we see at least one committed block authored from this authority. That way we'll be 100% sure that
789        // at least one block has been proposed and successfully received by a quorum of nodes.
790        let index_1 = committee.to_authority_index(1).unwrap();
791        'outer: while let Some(result) =
792            timeout(Duration::from_secs(10), commit_receivers[index_1].recv())
793                .await
794                .expect("Timed out while waiting for at least one committed block from authority 1")
795        {
796            for block in result.blocks {
797                if block.round() > GENESIS_ROUND && block.author() == index_1 {
798                    break 'outer;
799                }
800            }
801        }
802
803        // Stop authority 1 & 2.
804        // * Authority 1 will be used to wipe out their DB and practically "force" the amnesia recovery.
805        // * Authority 2 is stopped in order to simulate less than f+1 availability which will
806        // make authority 1 retry during amnesia recovery until it has finally managed to successfully get back f+1 responses.
807        // once authority 2 is up and running again.
808        authorities.remove(&index_1).unwrap().stop().await;
809        let index_2 = committee.to_authority_index(2).unwrap();
810        authorities.remove(&index_2).unwrap().stop().await;
811        sleep(Duration::from_secs(5)).await;
812
813        // Authority 1: create a new directory to simulate amnesia. The node will start having participated previously
814        // to consensus but now will attempt to synchronize the last own block and recover from there. It won't be able
815        // to do that successfully as authority 2 is still down.
816        let dir = TempDir::new().unwrap();
817        // We do reset the boot counter for this one to simulate a "binary" restart
818        boot_counters[index_1] = 0;
819        let (authority, mut commit_receiver) = make_authority(
820            index_1,
821            &dir,
822            committee.clone(),
823            keypairs.clone(),
824            NetworkType::Tonic,
825            boot_counters[index_1],
826            protocol_config.clone(),
827        )
828        .await;
829        boot_counters[index_1] += 1;
830        authorities.insert(index_1, authority);
831        temp_dirs.insert(index_1, dir);
832        sleep(Duration::from_secs(5)).await;
833
834        // Now spin up authority 2 using its earlier directly - so no amnesia recovery should be forced here.
835        // Authority 1 should be able to recover from amnesia successfully.
836        let (authority, _commit_receiver) = make_authority(
837            index_2,
838            &temp_dirs[&index_2],
839            committee.clone(),
840            keypairs,
841            NetworkType::Tonic,
842            boot_counters[index_2],
843            protocol_config.clone(),
844        )
845        .await;
846        boot_counters[index_2] += 1;
847        authorities.insert(index_2, authority);
848        sleep(Duration::from_secs(5)).await;
849
850        // We wait until we see at least one committed block authored from this authority
851        'outer: while let Some(result) = commit_receiver.recv().await {
852            for block in result.blocks {
853                if block.round() > GENESIS_ROUND && block.author() == index_1 {
854                    break 'outer;
855                }
856            }
857        }
858
859        // Stop all authorities and exit.
860        for (_, authority) in authorities {
861            authority.stop().await;
862        }
863    }
864
865    // TODO: create a fixture
866    async fn make_authority(
867        index: AuthorityIndex,
868        db_dir: &TempDir,
869        committee: Committee,
870        keypairs: Vec<(NetworkKeyPair, ProtocolKeyPair)>,
871        network_type: NetworkType,
872        boot_counter: u64,
873        protocol_config: ConsensusProtocolConfig,
874    ) -> (ConsensusAuthority, UnboundedReceiver<CommittedSubDag>) {
875        let registry = Registry::new();
876
877        // Cache less blocks to exercise commit sync.
878        let parameters = Parameters {
879            db_path: db_dir.path().to_path_buf(),
880            dag_state_cached_rounds: 5,
881            commit_sync_parallel_fetches: 2,
882            commit_sync_batch_size: 3,
883            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
884            ..Default::default()
885        };
886        let txn_verifier = NoopTransactionVerifier {};
887
888        let protocol_keypair = keypairs[index].1.clone();
889        let network_keypair = keypairs[index].0.clone();
890
891        let (commit_consumer, commit_receiver) = CommitConsumerArgs::new(0, 0);
892
893        let authority = ConsensusAuthority::start(
894            network_type,
895            0,
896            index,
897            committee,
898            parameters,
899            protocol_config,
900            protocol_keypair,
901            network_keypair,
902            Arc::new(Clock::default()),
903            Arc::new(txn_verifier),
904            commit_consumer,
905            registry,
906            boot_counter,
907        )
908        .await;
909
910        (authority, commit_receiver)
911    }
912}