consensus_core/
authority_node.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{sync::Arc, time::Instant};
5
6use consensus_config::{AuthorityIndex, Committee, NetworkKeyPair, Parameters, ProtocolKeyPair};
7use itertools::Itertools;
8use mysten_metrics::spawn_logged_monitored_task;
9use parking_lot::RwLock;
10use prometheus::Registry;
11use sui_protocol_config::ProtocolConfig;
12use tokio::task::JoinHandle;
13use tracing::{info, warn};
14
15use crate::{
16    CommitConsumerArgs,
17    authority_service::AuthorityService,
18    block_manager::BlockManager,
19    block_verifier::SignedBlockVerifier,
20    commit_observer::CommitObserver,
21    commit_syncer::{CommitSyncer, CommitSyncerHandle},
22    commit_vote_monitor::CommitVoteMonitor,
23    context::{Clock, Context},
24    core::{Core, CoreSignals},
25    core_thread::{ChannelCoreThreadDispatcher, CoreThreadHandle},
26    dag_state::DagState,
27    leader_schedule::LeaderSchedule,
28    leader_timeout::{LeaderTimeoutTask, LeaderTimeoutTaskHandle},
29    metrics::initialise_metrics,
30    network::{NetworkManager, tonic_network::TonicManager},
31    proposed_block_handler::ProposedBlockHandler,
32    round_prober::{RoundProber, RoundProberHandle},
33    round_tracker::PeerRoundTracker,
34    storage::rocksdb_store::RocksDBStore,
35    subscriber::Subscriber,
36    synchronizer::{Synchronizer, SynchronizerHandle},
37    transaction::{TransactionClient, TransactionConsumer, TransactionVerifier},
38    transaction_certifier::TransactionCertifier,
39};
40
41/// ConsensusAuthority is used by Sui to manage the lifetime of AuthorityNode.
42/// It hides the details of the implementation from the caller, MysticetiManager.
43#[allow(private_interfaces)]
44pub enum ConsensusAuthority {
45    WithTonic(AuthorityNode<TonicManager>),
46}
47
48impl ConsensusAuthority {
49    pub async fn start(
50        network_type: NetworkType,
51        epoch_start_timestamp_ms: u64,
52        own_index: AuthorityIndex,
53        committee: Committee,
54        parameters: Parameters,
55        protocol_config: ProtocolConfig,
56        protocol_keypair: ProtocolKeyPair,
57        network_keypair: NetworkKeyPair,
58        clock: Arc<Clock>,
59        transaction_verifier: Arc<dyn TransactionVerifier>,
60        commit_consumer: CommitConsumerArgs,
61        registry: Registry,
62        // A counter that keeps track of how many times the consensus authority has been booted while the process
63        // has been running. It's useful for making decisions on whether amnesia recovery should run.
64        // When `boot_counter` is 0, `ConsensusAuthority` will initiate the process of amnesia recovery if that's enabled in the parameters.
65        boot_counter: u64,
66    ) -> Self {
67        match network_type {
68            NetworkType::Tonic => {
69                let authority = AuthorityNode::start(
70                    epoch_start_timestamp_ms,
71                    own_index,
72                    committee,
73                    parameters,
74                    protocol_config,
75                    protocol_keypair,
76                    network_keypair,
77                    clock,
78                    transaction_verifier,
79                    commit_consumer,
80                    registry,
81                    boot_counter,
82                )
83                .await;
84                Self::WithTonic(authority)
85            }
86        }
87    }
88
89    pub async fn stop(self) {
90        match self {
91            Self::WithTonic(authority) => authority.stop().await,
92        }
93    }
94
95    pub fn transaction_client(&self) -> Arc<TransactionClient> {
96        match self {
97            Self::WithTonic(authority) => authority.transaction_client(),
98        }
99    }
100
101    #[cfg(test)]
102    fn context(&self) -> &Arc<Context> {
103        match self {
104            Self::WithTonic(authority) => &authority.context,
105        }
106    }
107}
108
109#[derive(Clone, Copy, PartialEq, Eq, Debug)]
110pub enum NetworkType {
111    Tonic,
112}
113
114pub(crate) struct AuthorityNode<N>
115where
116    N: NetworkManager<AuthorityService<ChannelCoreThreadDispatcher>>,
117{
118    context: Arc<Context>,
119    start_time: Instant,
120    transaction_client: Arc<TransactionClient>,
121    synchronizer: Arc<SynchronizerHandle>,
122
123    commit_syncer_handle: CommitSyncerHandle,
124    round_prober_handle: RoundProberHandle,
125    proposed_block_handler: JoinHandle<()>,
126    leader_timeout_handle: LeaderTimeoutTaskHandle,
127    core_thread_handle: CoreThreadHandle,
128    subscriber: Subscriber<N::Client, AuthorityService<ChannelCoreThreadDispatcher>>,
129    network_manager: N,
130}
131
132impl<N> AuthorityNode<N>
133where
134    N: NetworkManager<AuthorityService<ChannelCoreThreadDispatcher>>,
135{
136    // See comments above ConsensusAuthority::start() for details on the input.
137    pub(crate) async fn start(
138        epoch_start_timestamp_ms: u64,
139        own_index: AuthorityIndex,
140        committee: Committee,
141        parameters: Parameters,
142        protocol_config: ProtocolConfig,
143        protocol_keypair: ProtocolKeyPair,
144        network_keypair: NetworkKeyPair,
145        clock: Arc<Clock>,
146        transaction_verifier: Arc<dyn TransactionVerifier>,
147        commit_consumer: CommitConsumerArgs,
148        registry: Registry,
149        boot_counter: u64,
150    ) -> Self {
151        assert!(
152            committee.is_valid_index(own_index),
153            "Invalid own index {}",
154            own_index
155        );
156        let own_hostname = committee.authority(own_index).hostname.clone();
157        info!(
158            "Starting consensus authority {} {}, {:?}, epoch start timestamp {}, boot counter {}, replaying after commit index {}, consumer last processed commit index {}",
159            own_index,
160            own_hostname,
161            protocol_config.version,
162            epoch_start_timestamp_ms,
163            boot_counter,
164            commit_consumer.replay_after_commit_index,
165            commit_consumer.consumer_last_processed_commit_index
166        );
167        info!(
168            "Consensus authorities: {}",
169            committee
170                .authorities()
171                .map(|(i, a)| format!("{}: {}", i, a.hostname))
172                .join(", ")
173        );
174        info!("Consensus parameters: {:?}", parameters);
175        info!("Consensus committee: {:?}", committee);
176        let context = Arc::new(Context::new(
177            epoch_start_timestamp_ms,
178            own_index,
179            committee,
180            parameters,
181            protocol_config,
182            initialise_metrics(registry),
183            clock,
184        ));
185        let start_time = Instant::now();
186
187        context
188            .metrics
189            .node_metrics
190            .authority_index
191            .with_label_values(&[&own_hostname])
192            .set(context.own_index.value() as i64);
193        context
194            .metrics
195            .node_metrics
196            .protocol_version
197            .set(context.protocol_config.version.as_u64() as i64);
198
199        let (tx_client, tx_receiver) = TransactionClient::new(context.clone());
200        let tx_consumer = TransactionConsumer::new(tx_receiver, context.clone());
201
202        let (core_signals, signals_receivers) = CoreSignals::new(context.clone());
203
204        let mut network_manager = N::new(context.clone(), network_keypair);
205        let network_client = network_manager.client();
206
207        let store_path = context.parameters.db_path.as_path().to_str().unwrap();
208        let store = Arc::new(RocksDBStore::new(store_path));
209        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
210
211        let block_verifier = Arc::new(SignedBlockVerifier::new(
212            context.clone(),
213            transaction_verifier,
214        ));
215
216        let transaction_certifier = TransactionCertifier::new(
217            context.clone(),
218            block_verifier.clone(),
219            dag_state.clone(),
220            commit_consumer.block_sender.clone(),
221        );
222
223        let mut proposed_block_handler = ProposedBlockHandler::new(
224            context.clone(),
225            signals_receivers.block_broadcast_receiver(),
226            transaction_certifier.clone(),
227        );
228
229        let proposed_block_handler =
230            spawn_logged_monitored_task!(proposed_block_handler.run(), "proposed_block_handler");
231
232        let sync_last_known_own_block = boot_counter == 0
233            && !context
234                .parameters
235                .sync_last_known_own_block_timeout
236                .is_zero();
237        info!(
238            "Sync last known own block: {}. Boot count: {}. Timeout: {:?}.",
239            sync_last_known_own_block,
240            boot_counter,
241            context.parameters.sync_last_known_own_block_timeout
242        );
243
244        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
245
246        let leader_schedule = Arc::new(LeaderSchedule::from_store(
247            context.clone(),
248            dag_state.clone(),
249        ));
250
251        let commit_consumer_monitor = commit_consumer.monitor();
252        let commit_observer = CommitObserver::new(
253            context.clone(),
254            commit_consumer,
255            dag_state.clone(),
256            transaction_certifier.clone(),
257            leader_schedule.clone(),
258        )
259        .await;
260
261        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
262
263        // To avoid accidentally leaking the private key, the protocol key pair should only be
264        // kept in Core.
265        let core = Core::new(
266            context.clone(),
267            leader_schedule,
268            tx_consumer,
269            transaction_certifier.clone(),
270            block_manager,
271            commit_observer,
272            core_signals,
273            protocol_keypair,
274            dag_state.clone(),
275            sync_last_known_own_block,
276            round_tracker.clone(),
277        );
278
279        let (core_dispatcher, core_thread_handle) =
280            ChannelCoreThreadDispatcher::start(context.clone(), &dag_state, core);
281        let core_dispatcher = Arc::new(core_dispatcher);
282        let leader_timeout_handle =
283            LeaderTimeoutTask::start(core_dispatcher.clone(), &signals_receivers, context.clone());
284
285        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
286
287        let synchronizer = Synchronizer::start(
288            network_client.clone(),
289            context.clone(),
290            core_dispatcher.clone(),
291            commit_vote_monitor.clone(),
292            block_verifier.clone(),
293            transaction_certifier.clone(),
294            dag_state.clone(),
295            sync_last_known_own_block,
296        );
297
298        let commit_syncer_handle = CommitSyncer::new(
299            context.clone(),
300            core_dispatcher.clone(),
301            commit_vote_monitor.clone(),
302            commit_consumer_monitor.clone(),
303            block_verifier.clone(),
304            transaction_certifier.clone(),
305            network_client.clone(),
306            dag_state.clone(),
307        )
308        .start();
309
310        let round_prober_handle = RoundProber::new(
311            context.clone(),
312            core_dispatcher.clone(),
313            round_tracker.clone(),
314            dag_state.clone(),
315            network_client.clone(),
316        )
317        .start();
318
319        let network_service = Arc::new(AuthorityService::new(
320            context.clone(),
321            block_verifier,
322            commit_vote_monitor,
323            round_tracker.clone(),
324            synchronizer.clone(),
325            core_dispatcher,
326            signals_receivers.block_broadcast_receiver(),
327            transaction_certifier,
328            dag_state.clone(),
329            store,
330        ));
331
332        let subscriber = {
333            let s = Subscriber::new(
334                context.clone(),
335                network_client,
336                network_service.clone(),
337                dag_state,
338            );
339            for (peer, _) in context.committee.authorities() {
340                if peer != context.own_index {
341                    s.subscribe(peer);
342                }
343            }
344            s
345        };
346
347        network_manager.install_service(network_service).await;
348
349        info!(
350            "Consensus authority started, took {:?}",
351            start_time.elapsed()
352        );
353
354        Self {
355            context,
356            start_time,
357            transaction_client: Arc::new(tx_client),
358            synchronizer,
359            commit_syncer_handle,
360            round_prober_handle,
361            proposed_block_handler,
362            leader_timeout_handle,
363            core_thread_handle,
364            subscriber,
365            network_manager,
366        }
367    }
368
369    pub(crate) async fn stop(mut self) {
370        info!(
371            "Stopping authority. Total run time: {:?}",
372            self.start_time.elapsed()
373        );
374
375        // First shutdown components calling into Core.
376        if let Err(e) = self.synchronizer.stop().await {
377            if e.is_panic() {
378                std::panic::resume_unwind(e.into_panic());
379            }
380            warn!(
381                "Failed to stop synchronizer when shutting down consensus: {:?}",
382                e
383            );
384        };
385        self.commit_syncer_handle.stop().await;
386        self.round_prober_handle.stop().await;
387        self.proposed_block_handler.abort();
388        self.leader_timeout_handle.stop().await;
389        // Shutdown Core to stop block productions and broadcast.
390        self.core_thread_handle.stop().await;
391        // Stop block subscriptions before stopping network server.
392        self.subscriber.stop();
393        self.network_manager.stop().await;
394
395        self.context
396            .metrics
397            .node_metrics
398            .uptime
399            .observe(self.start_time.elapsed().as_secs_f64());
400    }
401
402    pub(crate) fn transaction_client(&self) -> Arc<TransactionClient> {
403        self.transaction_client.clone()
404    }
405}
406
407#[cfg(test)]
408mod tests {
409    #![allow(non_snake_case)]
410
411    use std::{
412        collections::{BTreeMap, BTreeSet},
413        sync::Arc,
414        time::Duration,
415    };
416
417    use consensus_config::{Parameters, local_committee_and_keys};
418    use mysten_metrics::RegistryService;
419    use mysten_metrics::monitored_mpsc::UnboundedReceiver;
420    use prometheus::Registry;
421    use rstest::rstest;
422    use sui_protocol_config::ProtocolConfig;
423    use tempfile::TempDir;
424    use tokio::time::{sleep, timeout};
425    use typed_store::DBMetrics;
426
427    use super::*;
428    use crate::{
429        CommittedSubDag,
430        block::{BlockAPI as _, CertifiedBlocksOutput, GENESIS_ROUND},
431        transaction::NoopTransactionVerifier,
432    };
433
434    #[rstest]
435    #[tokio::test]
436    async fn test_authority_start_and_stop(
437        #[values(NetworkType::Tonic)] network_type: NetworkType,
438    ) {
439        let (committee, keypairs) = local_committee_and_keys(0, vec![1]);
440        let registry = Registry::new();
441
442        let temp_dir = TempDir::new().unwrap();
443        let parameters = Parameters {
444            db_path: temp_dir.keep(),
445            ..Default::default()
446        };
447        let txn_verifier = NoopTransactionVerifier {};
448
449        let own_index = committee.to_authority_index(0).unwrap();
450        let protocol_keypair = keypairs[own_index].1.clone();
451        let network_keypair = keypairs[own_index].0.clone();
452
453        let (commit_consumer, _, _) = CommitConsumerArgs::new(0, 0);
454
455        let authority = ConsensusAuthority::start(
456            network_type,
457            0,
458            own_index,
459            committee,
460            parameters,
461            ProtocolConfig::get_for_max_version_UNSAFE(),
462            protocol_keypair,
463            network_keypair,
464            Arc::new(Clock::default()),
465            Arc::new(txn_verifier),
466            commit_consumer,
467            registry,
468            0,
469        )
470        .await;
471
472        assert_eq!(authority.context().own_index, own_index);
473        assert_eq!(authority.context().committee.epoch(), 0);
474        assert_eq!(authority.context().committee.size(), 1);
475
476        authority.stop().await;
477    }
478
479    // TODO: build AuthorityFixture.
480    #[rstest]
481    #[tokio::test(flavor = "current_thread")]
482    async fn test_authority_committee(
483        #[values(NetworkType::Tonic)] network_type: NetworkType,
484        #[values(5, 10)] gc_depth: u32,
485    ) {
486        telemetry_subscribers::init_for_testing();
487        let db_registry = Registry::new();
488        DBMetrics::init(RegistryService::new(db_registry));
489
490        const NUM_OF_AUTHORITIES: usize = 4;
491        let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
492        let mut protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
493        protocol_config.set_consensus_gc_depth_for_testing(gc_depth);
494
495        let temp_dirs = (0..NUM_OF_AUTHORITIES)
496            .map(|_| TempDir::new().unwrap())
497            .collect::<Vec<_>>();
498
499        let mut commit_receivers = Vec::with_capacity(committee.size());
500        let mut block_receivers = Vec::with_capacity(committee.size());
501        let mut authorities = Vec::with_capacity(committee.size());
502        let mut boot_counters = [0; NUM_OF_AUTHORITIES];
503
504        for (index, _authority_info) in committee.authorities() {
505            let (authority, commit_receiver, block_receiver) = make_authority(
506                index,
507                &temp_dirs[index.value()],
508                committee.clone(),
509                keypairs.clone(),
510                network_type,
511                boot_counters[index],
512                protocol_config.clone(),
513            )
514            .await;
515            boot_counters[index] += 1;
516            commit_receivers.push(commit_receiver);
517            block_receivers.push(block_receiver);
518            authorities.push(authority);
519        }
520
521        const NUM_TRANSACTIONS: u8 = 15;
522        let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
523        for i in 0..NUM_TRANSACTIONS {
524            let txn = vec![i; 16];
525            submitted_transactions.insert(txn.clone());
526            authorities[i as usize % authorities.len()]
527                .transaction_client()
528                .submit(vec![txn])
529                .await
530                .unwrap();
531        }
532
533        for receiver in &mut commit_receivers {
534            let mut expected_transactions = submitted_transactions.clone();
535            loop {
536                let committed_subdag =
537                    tokio::time::timeout(Duration::from_secs(1), receiver.recv())
538                        .await
539                        .unwrap()
540                        .unwrap();
541                for b in committed_subdag.blocks {
542                    for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
543                        assert!(
544                            expected_transactions.remove(&txn),
545                            "Transaction not submitted or already seen: {:?}",
546                            txn
547                        );
548                    }
549                }
550                assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
551                if expected_transactions.is_empty() {
552                    break;
553                }
554            }
555        }
556
557        // Stop authority 1.
558        let index = committee.to_authority_index(1).unwrap();
559        authorities.remove(index.value()).stop().await;
560        sleep(Duration::from_secs(10)).await;
561
562        // Restart authority 1 and let it run.
563        let (authority, commit_receiver, block_receiver) = make_authority(
564            index,
565            &temp_dirs[index.value()],
566            committee.clone(),
567            keypairs.clone(),
568            network_type,
569            boot_counters[index],
570            protocol_config.clone(),
571        )
572        .await;
573        boot_counters[index] += 1;
574        commit_receivers[index] = commit_receiver;
575        block_receivers[index] = block_receiver;
576        authorities.insert(index.value(), authority);
577        sleep(Duration::from_secs(10)).await;
578
579        // Stop all authorities and exit.
580        for authority in authorities {
581            authority.stop().await;
582        }
583    }
584
585    #[rstest]
586    #[tokio::test(flavor = "current_thread")]
587    async fn test_small_committee(
588        #[values(NetworkType::Tonic)] network_type: NetworkType,
589        #[values(1, 2, 3)] num_authorities: usize,
590    ) {
591        telemetry_subscribers::init_for_testing();
592        let db_registry = Registry::new();
593        DBMetrics::init(RegistryService::new(db_registry));
594
595        let (committee, keypairs) = local_committee_and_keys(0, vec![1; num_authorities]);
596        let protocol_config: ProtocolConfig = ProtocolConfig::get_for_max_version_UNSAFE();
597
598        let temp_dirs = (0..num_authorities)
599            .map(|_| TempDir::new().unwrap())
600            .collect::<Vec<_>>();
601
602        let mut output_receivers = Vec::with_capacity(committee.size());
603        let mut authorities: Vec<ConsensusAuthority> = Vec::with_capacity(committee.size());
604        let mut boot_counters = vec![0; num_authorities];
605
606        for (index, _authority_info) in committee.authorities() {
607            let (authority, commit_receiver, _block_receiver) = make_authority(
608                index,
609                &temp_dirs[index.value()],
610                committee.clone(),
611                keypairs.clone(),
612                network_type,
613                boot_counters[index],
614                protocol_config.clone(),
615            )
616            .await;
617            boot_counters[index] += 1;
618            output_receivers.push(commit_receiver);
619            authorities.push(authority);
620        }
621
622        const NUM_TRANSACTIONS: u8 = 15;
623        let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
624        for i in 0..NUM_TRANSACTIONS {
625            let txn = vec![i; 16];
626            submitted_transactions.insert(txn.clone());
627            authorities[i as usize % authorities.len()]
628                .transaction_client()
629                .submit(vec![txn])
630                .await
631                .unwrap();
632        }
633
634        for receiver in &mut output_receivers {
635            let mut expected_transactions = submitted_transactions.clone();
636            loop {
637                let committed_subdag =
638                    tokio::time::timeout(Duration::from_secs(1), receiver.recv())
639                        .await
640                        .unwrap()
641                        .unwrap();
642                for b in committed_subdag.blocks {
643                    for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
644                        assert!(
645                            expected_transactions.remove(&txn),
646                            "Transaction not submitted or already seen: {:?}",
647                            txn
648                        );
649                    }
650                }
651                assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
652                if expected_transactions.is_empty() {
653                    break;
654                }
655            }
656        }
657
658        // Stop authority 0.
659        let index = committee.to_authority_index(0).unwrap();
660        authorities.remove(index.value()).stop().await;
661        sleep(Duration::from_secs(10)).await;
662
663        // Restart authority 0 and let it run.
664        let (authority, commit_receiver, _block_receiver) = make_authority(
665            index,
666            &temp_dirs[index.value()],
667            committee.clone(),
668            keypairs.clone(),
669            network_type,
670            boot_counters[index],
671            protocol_config.clone(),
672        )
673        .await;
674        boot_counters[index] += 1;
675        output_receivers[index] = commit_receiver;
676        authorities.insert(index.value(), authority);
677        sleep(Duration::from_secs(10)).await;
678
679        // Stop all authorities and exit.
680        for authority in authorities {
681            authority.stop().await;
682        }
683    }
684
685    #[rstest]
686    #[tokio::test(flavor = "current_thread")]
687    async fn test_amnesia_recovery_success(#[values(5, 10)] gc_depth: u32) {
688        telemetry_subscribers::init_for_testing();
689        let db_registry = Registry::new();
690        DBMetrics::init(RegistryService::new(db_registry));
691
692        const NUM_OF_AUTHORITIES: usize = 4;
693        let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
694        let mut commit_receivers = vec![];
695        let mut block_receivers = vec![];
696        let mut authorities = BTreeMap::new();
697        let mut temp_dirs = BTreeMap::new();
698        let mut boot_counters = [0; NUM_OF_AUTHORITIES];
699
700        let mut protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
701        protocol_config.set_consensus_gc_depth_for_testing(gc_depth);
702
703        for (index, _authority_info) in committee.authorities() {
704            let dir = TempDir::new().unwrap();
705            let (authority, commit_receiver, block_receiver) = make_authority(
706                index,
707                &dir,
708                committee.clone(),
709                keypairs.clone(),
710                NetworkType::Tonic,
711                boot_counters[index],
712                protocol_config.clone(),
713            )
714            .await;
715            boot_counters[index] += 1;
716            commit_receivers.push(commit_receiver);
717            block_receivers.push(block_receiver);
718            authorities.insert(index, authority);
719            temp_dirs.insert(index, dir);
720        }
721
722        // Now we take the receiver of authority 1 and we wait until we see at least one block committed from this authority
723        // We wait until we see at least one committed block authored from this authority. That way we'll be 100% sure that
724        // at least one block has been proposed and successfully received by a quorum of nodes.
725        let index_1 = committee.to_authority_index(1).unwrap();
726        'outer: while let Some(result) =
727            timeout(Duration::from_secs(10), commit_receivers[index_1].recv())
728                .await
729                .expect("Timed out while waiting for at least one committed block from authority 1")
730        {
731            for block in result.blocks {
732                if block.round() > GENESIS_ROUND && block.author() == index_1 {
733                    break 'outer;
734                }
735            }
736        }
737
738        // Stop authority 1 & 2.
739        // * Authority 1 will be used to wipe out their DB and practically "force" the amnesia recovery.
740        // * Authority 2 is stopped in order to simulate less than f+1 availability which will
741        // make authority 1 retry during amnesia recovery until it has finally managed to successfully get back f+1 responses.
742        // once authority 2 is up and running again.
743        authorities.remove(&index_1).unwrap().stop().await;
744        let index_2 = committee.to_authority_index(2).unwrap();
745        authorities.remove(&index_2).unwrap().stop().await;
746        sleep(Duration::from_secs(5)).await;
747
748        // Authority 1: create a new directory to simulate amnesia. The node will start having participated previously
749        // to consensus but now will attempt to synchronize the last own block and recover from there. It won't be able
750        // to do that successfully as authority 2 is still down.
751        let dir = TempDir::new().unwrap();
752        // We do reset the boot counter for this one to simulate a "binary" restart
753        boot_counters[index_1] = 0;
754        let (authority, mut commit_receiver, _block_receiver) = make_authority(
755            index_1,
756            &dir,
757            committee.clone(),
758            keypairs.clone(),
759            NetworkType::Tonic,
760            boot_counters[index_1],
761            protocol_config.clone(),
762        )
763        .await;
764        boot_counters[index_1] += 1;
765        authorities.insert(index_1, authority);
766        temp_dirs.insert(index_1, dir);
767        sleep(Duration::from_secs(5)).await;
768
769        // Now spin up authority 2 using its earlier directly - so no amnesia recovery should be forced here.
770        // Authority 1 should be able to recover from amnesia successfully.
771        let (authority, _commit_receiver, _block_receiver) = make_authority(
772            index_2,
773            &temp_dirs[&index_2],
774            committee.clone(),
775            keypairs,
776            NetworkType::Tonic,
777            boot_counters[index_2],
778            protocol_config.clone(),
779        )
780        .await;
781        boot_counters[index_2] += 1;
782        authorities.insert(index_2, authority);
783        sleep(Duration::from_secs(5)).await;
784
785        // We wait until we see at least one committed block authored from this authority
786        'outer: while let Some(result) = commit_receiver.recv().await {
787            for block in result.blocks {
788                if block.round() > GENESIS_ROUND && block.author() == index_1 {
789                    break 'outer;
790                }
791            }
792        }
793
794        // Stop all authorities and exit.
795        for (_, authority) in authorities {
796            authority.stop().await;
797        }
798    }
799
800    // TODO: create a fixture
801    async fn make_authority(
802        index: AuthorityIndex,
803        db_dir: &TempDir,
804        committee: Committee,
805        keypairs: Vec<(NetworkKeyPair, ProtocolKeyPair)>,
806        network_type: NetworkType,
807        boot_counter: u64,
808        protocol_config: ProtocolConfig,
809    ) -> (
810        ConsensusAuthority,
811        UnboundedReceiver<CommittedSubDag>,
812        UnboundedReceiver<CertifiedBlocksOutput>,
813    ) {
814        let registry = Registry::new();
815
816        // Cache less blocks to exercise commit sync.
817        let parameters = Parameters {
818            db_path: db_dir.path().to_path_buf(),
819            dag_state_cached_rounds: 5,
820            commit_sync_parallel_fetches: 2,
821            commit_sync_batch_size: 3,
822            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
823            ..Default::default()
824        };
825        let txn_verifier = NoopTransactionVerifier {};
826
827        let protocol_keypair = keypairs[index].1.clone();
828        let network_keypair = keypairs[index].0.clone();
829
830        let (commit_consumer, commit_receiver, block_receiver) = CommitConsumerArgs::new(0, 0);
831
832        let authority = ConsensusAuthority::start(
833            network_type,
834            0,
835            index,
836            committee,
837            parameters,
838            protocol_config,
839            protocol_keypair,
840            network_keypair,
841            Arc::new(Clock::default()),
842            Arc::new(txn_verifier),
843            commit_consumer,
844            registry,
845            boot_counter,
846        )
847        .await;
848
849        (authority, commit_receiver, block_receiver)
850    }
851}