consensus_core/
authority_node.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{sync::Arc, time::Instant};
5
6use consensus_config::{AuthorityIndex, Committee, NetworkKeyPair, Parameters, ProtocolKeyPair};
7use itertools::Itertools;
8use mysten_metrics::spawn_logged_monitored_task;
9use parking_lot::RwLock;
10use prometheus::Registry;
11use sui_protocol_config::ProtocolConfig;
12use tokio::task::JoinHandle;
13use tracing::{info, warn};
14
15use crate::{
16    CommitConsumerArgs,
17    authority_service::AuthorityService,
18    block_manager::BlockManager,
19    block_verifier::SignedBlockVerifier,
20    commit_observer::CommitObserver,
21    commit_syncer::{CommitSyncer, CommitSyncerHandle},
22    commit_vote_monitor::CommitVoteMonitor,
23    context::{Clock, Context},
24    core::{Core, CoreSignals},
25    core_thread::{ChannelCoreThreadDispatcher, CoreThreadHandle},
26    dag_state::DagState,
27    leader_schedule::LeaderSchedule,
28    leader_timeout::{LeaderTimeoutTask, LeaderTimeoutTaskHandle},
29    metrics::initialise_metrics,
30    network::{NetworkManager, tonic_network::TonicManager},
31    proposed_block_handler::ProposedBlockHandler,
32    round_prober::{RoundProber, RoundProberHandle},
33    round_tracker::PeerRoundTracker,
34    storage::rocksdb_store::RocksDBStore,
35    subscriber::Subscriber,
36    synchronizer::{Synchronizer, SynchronizerHandle},
37    transaction::{TransactionClient, TransactionConsumer, TransactionVerifier},
38    transaction_certifier::TransactionCertifier,
39};
40
41/// ConsensusAuthority is used by Sui to manage the lifetime of AuthorityNode.
42/// It hides the details of the implementation from the caller, MysticetiManager.
43#[allow(private_interfaces)]
44pub enum ConsensusAuthority {
45    WithTonic(AuthorityNode<TonicManager>),
46}
47
48impl ConsensusAuthority {
49    pub async fn start(
50        network_type: NetworkType,
51        epoch_start_timestamp_ms: u64,
52        own_index: AuthorityIndex,
53        committee: Committee,
54        parameters: Parameters,
55        protocol_config: ProtocolConfig,
56        protocol_keypair: ProtocolKeyPair,
57        network_keypair: NetworkKeyPair,
58        clock: Arc<Clock>,
59        transaction_verifier: Arc<dyn TransactionVerifier>,
60        commit_consumer: CommitConsumerArgs,
61        registry: Registry,
62        // A counter that keeps track of how many times the authority node has been booted while the binary
63        // or the component that is calling the `ConsensusAuthority` has been running. It's mostly useful to
64        // make decisions on whether amnesia recovery should run or not. When `boot_counter` is 0, then `ConsensusAuthority`
65        // will initiate the process of amnesia recovery if that's enabled in the parameters.
66        boot_counter: u64,
67    ) -> Self {
68        match network_type {
69            NetworkType::Tonic => {
70                let authority = AuthorityNode::start(
71                    epoch_start_timestamp_ms,
72                    own_index,
73                    committee,
74                    parameters,
75                    protocol_config,
76                    protocol_keypair,
77                    network_keypair,
78                    clock,
79                    transaction_verifier,
80                    commit_consumer,
81                    registry,
82                    boot_counter,
83                )
84                .await;
85                Self::WithTonic(authority)
86            }
87        }
88    }
89
90    pub async fn stop(self) {
91        match self {
92            Self::WithTonic(authority) => authority.stop().await,
93        }
94    }
95
96    pub fn transaction_client(&self) -> Arc<TransactionClient> {
97        match self {
98            Self::WithTonic(authority) => authority.transaction_client(),
99        }
100    }
101
102    #[cfg(test)]
103    fn context(&self) -> &Arc<Context> {
104        match self {
105            Self::WithTonic(authority) => &authority.context,
106        }
107    }
108}
109
110#[derive(Clone, Copy, PartialEq, Eq, Debug)]
111pub enum NetworkType {
112    Tonic,
113}
114
115pub(crate) struct AuthorityNode<N>
116where
117    N: NetworkManager<AuthorityService<ChannelCoreThreadDispatcher>>,
118{
119    context: Arc<Context>,
120    start_time: Instant,
121    transaction_client: Arc<TransactionClient>,
122    synchronizer: Arc<SynchronizerHandle>,
123
124    commit_syncer_handle: CommitSyncerHandle,
125    round_prober_handle: RoundProberHandle,
126    proposed_block_handler: JoinHandle<()>,
127    leader_timeout_handle: LeaderTimeoutTaskHandle,
128    core_thread_handle: CoreThreadHandle,
129    subscriber: Subscriber<N::Client, AuthorityService<ChannelCoreThreadDispatcher>>,
130    network_manager: N,
131}
132
133impl<N> AuthorityNode<N>
134where
135    N: NetworkManager<AuthorityService<ChannelCoreThreadDispatcher>>,
136{
137    pub(crate) async fn start(
138        epoch_start_timestamp_ms: u64,
139        own_index: AuthorityIndex,
140        committee: Committee,
141        parameters: Parameters,
142        protocol_config: ProtocolConfig,
143        // To avoid accidentally leaking the private key, the protocol key pair should only be
144        // kept in Core.
145        protocol_keypair: ProtocolKeyPair,
146        network_keypair: NetworkKeyPair,
147        clock: Arc<Clock>,
148        transaction_verifier: Arc<dyn TransactionVerifier>,
149        commit_consumer: CommitConsumerArgs,
150        registry: Registry,
151        boot_counter: u64,
152    ) -> Self {
153        assert!(
154            committee.is_valid_index(own_index),
155            "Invalid own index {}",
156            own_index
157        );
158        let own_hostname = committee.authority(own_index).hostname.clone();
159        info!(
160            "Starting consensus authority {} {}, {:?}, epoch start timestamp {}, boot counter {}, replaying after commit index {}, consumer last processed commit index {}",
161            own_index,
162            own_hostname,
163            protocol_config.version,
164            epoch_start_timestamp_ms,
165            boot_counter,
166            commit_consumer.replay_after_commit_index,
167            commit_consumer.consumer_last_processed_commit_index
168        );
169        info!(
170            "Consensus authorities: {}",
171            committee
172                .authorities()
173                .map(|(i, a)| format!("{}: {}", i, a.hostname))
174                .join(", ")
175        );
176        info!("Consensus parameters: {:?}", parameters);
177        info!("Consensus committee: {:?}", committee);
178        let context = Arc::new(Context::new(
179            epoch_start_timestamp_ms,
180            own_index,
181            committee,
182            parameters,
183            protocol_config,
184            initialise_metrics(registry),
185            clock,
186        ));
187        let start_time = Instant::now();
188
189        context
190            .metrics
191            .node_metrics
192            .authority_index
193            .with_label_values(&[&own_hostname])
194            .set(context.own_index.value() as i64);
195        context
196            .metrics
197            .node_metrics
198            .protocol_version
199            .set(context.protocol_config.version.as_u64() as i64);
200
201        let (tx_client, tx_receiver) = TransactionClient::new(context.clone());
202        let tx_consumer = TransactionConsumer::new(tx_receiver, context.clone());
203
204        let (core_signals, signals_receivers) = CoreSignals::new(context.clone());
205
206        let mut network_manager = N::new(context.clone(), network_keypair);
207        let network_client = network_manager.client();
208
209        let store_path = context.parameters.db_path.as_path().to_str().unwrap();
210        let store = Arc::new(RocksDBStore::new(store_path));
211        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
212
213        let block_verifier = Arc::new(SignedBlockVerifier::new(
214            context.clone(),
215            transaction_verifier,
216        ));
217
218        let transaction_certifier = TransactionCertifier::new(
219            context.clone(),
220            block_verifier.clone(),
221            dag_state.clone(),
222            commit_consumer.block_sender.clone(),
223        );
224
225        let mut proposed_block_handler = ProposedBlockHandler::new(
226            context.clone(),
227            signals_receivers.block_broadcast_receiver(),
228            transaction_certifier.clone(),
229        );
230
231        let proposed_block_handler =
232            spawn_logged_monitored_task!(proposed_block_handler.run(), "proposed_block_handler");
233
234        let sync_last_known_own_block = boot_counter == 0
235            && dag_state.read().highest_accepted_round() == 0
236            && !context
237                .parameters
238                .sync_last_known_own_block_timeout
239                .is_zero();
240        info!("Sync last known own block: {sync_last_known_own_block}");
241
242        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
243
244        let leader_schedule = Arc::new(LeaderSchedule::from_store(
245            context.clone(),
246            dag_state.clone(),
247        ));
248
249        let commit_consumer_monitor = commit_consumer.monitor();
250        let commit_observer = CommitObserver::new(
251            context.clone(),
252            commit_consumer,
253            dag_state.clone(),
254            transaction_certifier.clone(),
255            leader_schedule.clone(),
256        )
257        .await;
258
259        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
260
261        let core = Core::new(
262            context.clone(),
263            leader_schedule,
264            tx_consumer,
265            transaction_certifier.clone(),
266            block_manager,
267            commit_observer,
268            core_signals,
269            protocol_keypair,
270            dag_state.clone(),
271            sync_last_known_own_block,
272            round_tracker.clone(),
273        );
274
275        let (core_dispatcher, core_thread_handle) =
276            ChannelCoreThreadDispatcher::start(context.clone(), &dag_state, core);
277        let core_dispatcher = Arc::new(core_dispatcher);
278        let leader_timeout_handle =
279            LeaderTimeoutTask::start(core_dispatcher.clone(), &signals_receivers, context.clone());
280
281        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
282
283        let synchronizer = Synchronizer::start(
284            network_client.clone(),
285            context.clone(),
286            core_dispatcher.clone(),
287            commit_vote_monitor.clone(),
288            block_verifier.clone(),
289            transaction_certifier.clone(),
290            dag_state.clone(),
291            sync_last_known_own_block,
292        );
293
294        let commit_syncer_handle = CommitSyncer::new(
295            context.clone(),
296            core_dispatcher.clone(),
297            commit_vote_monitor.clone(),
298            commit_consumer_monitor.clone(),
299            block_verifier.clone(),
300            transaction_certifier.clone(),
301            network_client.clone(),
302            dag_state.clone(),
303        )
304        .start();
305
306        let round_prober_handle = RoundProber::new(
307            context.clone(),
308            core_dispatcher.clone(),
309            round_tracker.clone(),
310            dag_state.clone(),
311            network_client.clone(),
312        )
313        .start();
314
315        let network_service = Arc::new(AuthorityService::new(
316            context.clone(),
317            block_verifier,
318            commit_vote_monitor,
319            round_tracker.clone(),
320            synchronizer.clone(),
321            core_dispatcher,
322            signals_receivers.block_broadcast_receiver(),
323            transaction_certifier,
324            dag_state.clone(),
325            store,
326        ));
327
328        let subscriber = {
329            let s = Subscriber::new(
330                context.clone(),
331                network_client,
332                network_service.clone(),
333                dag_state,
334            );
335            for (peer, _) in context.committee.authorities() {
336                if peer != context.own_index {
337                    s.subscribe(peer);
338                }
339            }
340            s
341        };
342
343        network_manager.install_service(network_service).await;
344
345        info!(
346            "Consensus authority started, took {:?}",
347            start_time.elapsed()
348        );
349
350        Self {
351            context,
352            start_time,
353            transaction_client: Arc::new(tx_client),
354            synchronizer,
355            commit_syncer_handle,
356            round_prober_handle,
357            proposed_block_handler,
358            leader_timeout_handle,
359            core_thread_handle,
360            subscriber,
361            network_manager,
362        }
363    }
364
365    pub(crate) async fn stop(mut self) {
366        info!(
367            "Stopping authority. Total run time: {:?}",
368            self.start_time.elapsed()
369        );
370
371        // First shutdown components calling into Core.
372        if let Err(e) = self.synchronizer.stop().await {
373            if e.is_panic() {
374                std::panic::resume_unwind(e.into_panic());
375            }
376            warn!(
377                "Failed to stop synchronizer when shutting down consensus: {:?}",
378                e
379            );
380        };
381        self.commit_syncer_handle.stop().await;
382        self.round_prober_handle.stop().await;
383        self.proposed_block_handler.abort();
384        self.leader_timeout_handle.stop().await;
385        // Shutdown Core to stop block productions and broadcast.
386        self.core_thread_handle.stop().await;
387        // Stop block subscriptions before stopping network server.
388        self.subscriber.stop();
389        self.network_manager.stop().await;
390
391        self.context
392            .metrics
393            .node_metrics
394            .uptime
395            .observe(self.start_time.elapsed().as_secs_f64());
396    }
397
398    pub(crate) fn transaction_client(&self) -> Arc<TransactionClient> {
399        self.transaction_client.clone()
400    }
401}
402
403#[cfg(test)]
404mod tests {
405    #![allow(non_snake_case)]
406
407    use std::{
408        collections::{BTreeMap, BTreeSet},
409        sync::Arc,
410        time::Duration,
411    };
412
413    use consensus_config::{Parameters, local_committee_and_keys};
414    use mysten_metrics::RegistryService;
415    use mysten_metrics::monitored_mpsc::UnboundedReceiver;
416    use prometheus::Registry;
417    use rstest::rstest;
418    use sui_protocol_config::ProtocolConfig;
419    use tempfile::TempDir;
420    use tokio::time::{sleep, timeout};
421    use typed_store::DBMetrics;
422
423    use super::*;
424    use crate::{
425        CommittedSubDag,
426        block::{BlockAPI as _, CertifiedBlocksOutput, GENESIS_ROUND},
427        transaction::NoopTransactionVerifier,
428    };
429
430    #[rstest]
431    #[tokio::test]
432    async fn test_authority_start_and_stop(
433        #[values(NetworkType::Tonic)] network_type: NetworkType,
434    ) {
435        let (committee, keypairs) = local_committee_and_keys(0, vec![1]);
436        let registry = Registry::new();
437
438        let temp_dir = TempDir::new().unwrap();
439        let parameters = Parameters {
440            db_path: temp_dir.keep(),
441            ..Default::default()
442        };
443        let txn_verifier = NoopTransactionVerifier {};
444
445        let own_index = committee.to_authority_index(0).unwrap();
446        let protocol_keypair = keypairs[own_index].1.clone();
447        let network_keypair = keypairs[own_index].0.clone();
448
449        let (commit_consumer, _, _) = CommitConsumerArgs::new(0, 0);
450
451        let authority = ConsensusAuthority::start(
452            network_type,
453            0,
454            own_index,
455            committee,
456            parameters,
457            ProtocolConfig::get_for_max_version_UNSAFE(),
458            protocol_keypair,
459            network_keypair,
460            Arc::new(Clock::default()),
461            Arc::new(txn_verifier),
462            commit_consumer,
463            registry,
464            0,
465        )
466        .await;
467
468        assert_eq!(authority.context().own_index, own_index);
469        assert_eq!(authority.context().committee.epoch(), 0);
470        assert_eq!(authority.context().committee.size(), 1);
471
472        authority.stop().await;
473    }
474
475    // TODO: build AuthorityFixture.
476    #[rstest]
477    #[tokio::test(flavor = "current_thread")]
478    async fn test_authority_committee(
479        #[values(NetworkType::Tonic)] network_type: NetworkType,
480        #[values(5, 10)] gc_depth: u32,
481    ) {
482        telemetry_subscribers::init_for_testing();
483        let db_registry = Registry::new();
484        DBMetrics::init(RegistryService::new(db_registry));
485
486        const NUM_OF_AUTHORITIES: usize = 4;
487        let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
488        let mut protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
489        protocol_config.set_consensus_gc_depth_for_testing(gc_depth);
490
491        let temp_dirs = (0..NUM_OF_AUTHORITIES)
492            .map(|_| TempDir::new().unwrap())
493            .collect::<Vec<_>>();
494
495        let mut commit_receivers = Vec::with_capacity(committee.size());
496        let mut block_receivers = Vec::with_capacity(committee.size());
497        let mut authorities = Vec::with_capacity(committee.size());
498        let mut boot_counters = [0; NUM_OF_AUTHORITIES];
499
500        for (index, _authority_info) in committee.authorities() {
501            let (authority, commit_receiver, block_receiver) = make_authority(
502                index,
503                &temp_dirs[index.value()],
504                committee.clone(),
505                keypairs.clone(),
506                network_type,
507                boot_counters[index],
508                protocol_config.clone(),
509            )
510            .await;
511            boot_counters[index] += 1;
512            commit_receivers.push(commit_receiver);
513            block_receivers.push(block_receiver);
514            authorities.push(authority);
515        }
516
517        const NUM_TRANSACTIONS: u8 = 15;
518        let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
519        for i in 0..NUM_TRANSACTIONS {
520            let txn = vec![i; 16];
521            submitted_transactions.insert(txn.clone());
522            authorities[i as usize % authorities.len()]
523                .transaction_client()
524                .submit(vec![txn])
525                .await
526                .unwrap();
527        }
528
529        for receiver in &mut commit_receivers {
530            let mut expected_transactions = submitted_transactions.clone();
531            loop {
532                let committed_subdag =
533                    tokio::time::timeout(Duration::from_secs(1), receiver.recv())
534                        .await
535                        .unwrap()
536                        .unwrap();
537                for b in committed_subdag.blocks {
538                    for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
539                        assert!(
540                            expected_transactions.remove(&txn),
541                            "Transaction not submitted or already seen: {:?}",
542                            txn
543                        );
544                    }
545                }
546                assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
547                if expected_transactions.is_empty() {
548                    break;
549                }
550            }
551        }
552
553        // Stop authority 1.
554        let index = committee.to_authority_index(1).unwrap();
555        authorities.remove(index.value()).stop().await;
556        sleep(Duration::from_secs(10)).await;
557
558        // Restart authority 1 and let it run.
559        let (authority, commit_receiver, block_receiver) = make_authority(
560            index,
561            &temp_dirs[index.value()],
562            committee.clone(),
563            keypairs.clone(),
564            network_type,
565            boot_counters[index],
566            protocol_config.clone(),
567        )
568        .await;
569        boot_counters[index] += 1;
570        commit_receivers[index] = commit_receiver;
571        block_receivers[index] = block_receiver;
572        authorities.insert(index.value(), authority);
573        sleep(Duration::from_secs(10)).await;
574
575        // Stop all authorities and exit.
576        for authority in authorities {
577            authority.stop().await;
578        }
579    }
580
581    #[rstest]
582    #[tokio::test(flavor = "current_thread")]
583    async fn test_small_committee(
584        #[values(NetworkType::Tonic)] network_type: NetworkType,
585        #[values(1, 2, 3)] num_authorities: usize,
586    ) {
587        telemetry_subscribers::init_for_testing();
588        let db_registry = Registry::new();
589        DBMetrics::init(RegistryService::new(db_registry));
590
591        let (committee, keypairs) = local_committee_and_keys(0, vec![1; num_authorities]);
592        let protocol_config: ProtocolConfig = ProtocolConfig::get_for_max_version_UNSAFE();
593
594        let temp_dirs = (0..num_authorities)
595            .map(|_| TempDir::new().unwrap())
596            .collect::<Vec<_>>();
597
598        let mut output_receivers = Vec::with_capacity(committee.size());
599        let mut authorities: Vec<ConsensusAuthority> = Vec::with_capacity(committee.size());
600        let mut boot_counters = vec![0; num_authorities];
601
602        for (index, _authority_info) in committee.authorities() {
603            let (authority, commit_receiver, _block_receiver) = make_authority(
604                index,
605                &temp_dirs[index.value()],
606                committee.clone(),
607                keypairs.clone(),
608                network_type,
609                boot_counters[index],
610                protocol_config.clone(),
611            )
612            .await;
613            boot_counters[index] += 1;
614            output_receivers.push(commit_receiver);
615            authorities.push(authority);
616        }
617
618        const NUM_TRANSACTIONS: u8 = 15;
619        let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
620        for i in 0..NUM_TRANSACTIONS {
621            let txn = vec![i; 16];
622            submitted_transactions.insert(txn.clone());
623            authorities[i as usize % authorities.len()]
624                .transaction_client()
625                .submit(vec![txn])
626                .await
627                .unwrap();
628        }
629
630        for receiver in &mut output_receivers {
631            let mut expected_transactions = submitted_transactions.clone();
632            loop {
633                let committed_subdag =
634                    tokio::time::timeout(Duration::from_secs(1), receiver.recv())
635                        .await
636                        .unwrap()
637                        .unwrap();
638                for b in committed_subdag.blocks {
639                    for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
640                        assert!(
641                            expected_transactions.remove(&txn),
642                            "Transaction not submitted or already seen: {:?}",
643                            txn
644                        );
645                    }
646                }
647                assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
648                if expected_transactions.is_empty() {
649                    break;
650                }
651            }
652        }
653
654        // Stop authority 0.
655        let index = committee.to_authority_index(0).unwrap();
656        authorities.remove(index.value()).stop().await;
657        sleep(Duration::from_secs(10)).await;
658
659        // Restart authority 0 and let it run.
660        let (authority, commit_receiver, _block_receiver) = make_authority(
661            index,
662            &temp_dirs[index.value()],
663            committee.clone(),
664            keypairs.clone(),
665            network_type,
666            boot_counters[index],
667            protocol_config.clone(),
668        )
669        .await;
670        boot_counters[index] += 1;
671        output_receivers[index] = commit_receiver;
672        authorities.insert(index.value(), authority);
673        sleep(Duration::from_secs(10)).await;
674
675        // Stop all authorities and exit.
676        for authority in authorities {
677            authority.stop().await;
678        }
679    }
680
681    #[rstest]
682    #[tokio::test(flavor = "current_thread")]
683    async fn test_amnesia_recovery_success(#[values(5, 10)] gc_depth: u32) {
684        telemetry_subscribers::init_for_testing();
685        let db_registry = Registry::new();
686        DBMetrics::init(RegistryService::new(db_registry));
687
688        const NUM_OF_AUTHORITIES: usize = 4;
689        let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
690        let mut commit_receivers = vec![];
691        let mut block_receivers = vec![];
692        let mut authorities = BTreeMap::new();
693        let mut temp_dirs = BTreeMap::new();
694        let mut boot_counters = [0; NUM_OF_AUTHORITIES];
695
696        let mut protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
697        protocol_config.set_consensus_gc_depth_for_testing(gc_depth);
698
699        for (index, _authority_info) in committee.authorities() {
700            let dir = TempDir::new().unwrap();
701            let (authority, commit_receiver, block_receiver) = make_authority(
702                index,
703                &dir,
704                committee.clone(),
705                keypairs.clone(),
706                NetworkType::Tonic,
707                boot_counters[index],
708                protocol_config.clone(),
709            )
710            .await;
711            boot_counters[index] += 1;
712            commit_receivers.push(commit_receiver);
713            block_receivers.push(block_receiver);
714            authorities.insert(index, authority);
715            temp_dirs.insert(index, dir);
716        }
717
718        // Now we take the receiver of authority 1 and we wait until we see at least one block committed from this authority
719        // We wait until we see at least one committed block authored from this authority. That way we'll be 100% sure that
720        // at least one block has been proposed and successfully received by a quorum of nodes.
721        let index_1 = committee.to_authority_index(1).unwrap();
722        'outer: while let Some(result) =
723            timeout(Duration::from_secs(10), commit_receivers[index_1].recv())
724                .await
725                .expect("Timed out while waiting for at least one committed block from authority 1")
726        {
727            for block in result.blocks {
728                if block.round() > GENESIS_ROUND && block.author() == index_1 {
729                    break 'outer;
730                }
731            }
732        }
733
734        // Stop authority 1 & 2.
735        // * Authority 1 will be used to wipe out their DB and practically "force" the amnesia recovery.
736        // * Authority 2 is stopped in order to simulate less than f+1 availability which will
737        // make authority 1 retry during amnesia recovery until it has finally managed to successfully get back f+1 responses.
738        // once authority 2 is up and running again.
739        authorities.remove(&index_1).unwrap().stop().await;
740        let index_2 = committee.to_authority_index(2).unwrap();
741        authorities.remove(&index_2).unwrap().stop().await;
742        sleep(Duration::from_secs(5)).await;
743
744        // Authority 1: create a new directory to simulate amnesia. The node will start having participated previously
745        // to consensus but now will attempt to synchronize the last own block and recover from there. It won't be able
746        // to do that successfully as authority 2 is still down.
747        let dir = TempDir::new().unwrap();
748        // We do reset the boot counter for this one to simulate a "binary" restart
749        boot_counters[index_1] = 0;
750        let (authority, mut commit_receiver, _block_receiver) = make_authority(
751            index_1,
752            &dir,
753            committee.clone(),
754            keypairs.clone(),
755            NetworkType::Tonic,
756            boot_counters[index_1],
757            protocol_config.clone(),
758        )
759        .await;
760        boot_counters[index_1] += 1;
761        authorities.insert(index_1, authority);
762        temp_dirs.insert(index_1, dir);
763        sleep(Duration::from_secs(5)).await;
764
765        // Now spin up authority 2 using its earlier directly - so no amnesia recovery should be forced here.
766        // Authority 1 should be able to recover from amnesia successfully.
767        let (authority, _commit_receiver, _block_receiver) = make_authority(
768            index_2,
769            &temp_dirs[&index_2],
770            committee.clone(),
771            keypairs,
772            NetworkType::Tonic,
773            boot_counters[index_2],
774            protocol_config.clone(),
775        )
776        .await;
777        boot_counters[index_2] += 1;
778        authorities.insert(index_2, authority);
779        sleep(Duration::from_secs(5)).await;
780
781        // We wait until we see at least one committed block authored from this authority
782        'outer: while let Some(result) = commit_receiver.recv().await {
783            for block in result.blocks {
784                if block.round() > GENESIS_ROUND && block.author() == index_1 {
785                    break 'outer;
786                }
787            }
788        }
789
790        // Stop all authorities and exit.
791        for (_, authority) in authorities {
792            authority.stop().await;
793        }
794    }
795
796    // TODO: create a fixture
797    async fn make_authority(
798        index: AuthorityIndex,
799        db_dir: &TempDir,
800        committee: Committee,
801        keypairs: Vec<(NetworkKeyPair, ProtocolKeyPair)>,
802        network_type: NetworkType,
803        boot_counter: u64,
804        protocol_config: ProtocolConfig,
805    ) -> (
806        ConsensusAuthority,
807        UnboundedReceiver<CommittedSubDag>,
808        UnboundedReceiver<CertifiedBlocksOutput>,
809    ) {
810        let registry = Registry::new();
811
812        // Cache less blocks to exercise commit sync.
813        let parameters = Parameters {
814            db_path: db_dir.path().to_path_buf(),
815            dag_state_cached_rounds: 5,
816            commit_sync_parallel_fetches: 2,
817            commit_sync_batch_size: 3,
818            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
819            ..Default::default()
820        };
821        let txn_verifier = NoopTransactionVerifier {};
822
823        let protocol_keypair = keypairs[index].1.clone();
824        let network_keypair = keypairs[index].0.clone();
825
826        let (commit_consumer, commit_receiver, block_receiver) = CommitConsumerArgs::new(0, 0);
827
828        let authority = ConsensusAuthority::start(
829            network_type,
830            0,
831            index,
832            committee,
833            parameters,
834            protocol_config,
835            protocol_keypair,
836            network_keypair,
837            Arc::new(Clock::default()),
838            Arc::new(txn_verifier),
839            commit_consumer,
840            registry,
841            boot_counter,
842        )
843        .await;
844
845        (authority, commit_receiver, block_receiver)
846    }
847}