consensus_core/
authority_node.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{sync::Arc, time::Instant};
5
6use consensus_config::{
7    AuthorityIndex, Committee, NetworkKeyPair, NetworkPublicKey, Parameters, ProtocolKeyPair,
8};
9use consensus_types::block::Round;
10use itertools::Itertools;
11use mysten_metrics::spawn_logged_monitored_task;
12use mysten_network::Multiaddr;
13use parking_lot::RwLock;
14use prometheus::Registry;
15use sui_protocol_config::ProtocolConfig;
16use tokio::task::JoinHandle;
17use tracing::{info, warn};
18
19use crate::{
20    BlockAPI as _, CommitConsumerArgs,
21    authority_service::AuthorityService,
22    block_manager::BlockManager,
23    block_verifier::SignedBlockVerifier,
24    commit_observer::CommitObserver,
25    commit_syncer::{CommitSyncer, CommitSyncerHandle},
26    commit_vote_monitor::CommitVoteMonitor,
27    context::{Clock, Context},
28    core::{Core, CoreSignals},
29    core_thread::{ChannelCoreThreadDispatcher, CoreThreadHandle},
30    dag_state::DagState,
31    leader_schedule::LeaderSchedule,
32    leader_timeout::{LeaderTimeoutTask, LeaderTimeoutTaskHandle},
33    metrics::initialise_metrics,
34    network::{NetworkManager, tonic_network::TonicManager},
35    proposed_block_handler::ProposedBlockHandler,
36    round_prober::{RoundProber, RoundProberHandle},
37    round_tracker::RoundTracker,
38    storage::rocksdb_store::RocksDBStore,
39    subscriber::Subscriber,
40    synchronizer::{Synchronizer, SynchronizerHandle},
41    transaction::{TransactionClient, TransactionConsumer, TransactionVerifier},
42    transaction_certifier::TransactionCertifier,
43};
44
45/// ConsensusAuthority is used by Sui to manage the lifetime of AuthorityNode.
46/// It hides the details of the implementation from the caller, MysticetiManager.
47#[allow(private_interfaces)]
48pub enum ConsensusAuthority {
49    WithTonic(AuthorityNode<TonicManager>),
50}
51
52impl ConsensusAuthority {
53    pub async fn start(
54        network_type: NetworkType,
55        epoch_start_timestamp_ms: u64,
56        own_index: AuthorityIndex,
57        committee: Committee,
58        parameters: Parameters,
59        protocol_config: ProtocolConfig,
60        protocol_keypair: ProtocolKeyPair,
61        network_keypair: NetworkKeyPair,
62        clock: Arc<Clock>,
63        transaction_verifier: Arc<dyn TransactionVerifier>,
64        commit_consumer: CommitConsumerArgs,
65        registry: Registry,
66        // A counter that keeps track of how many times the consensus authority has been booted while the process
67        // has been running. It's useful for making decisions on whether amnesia recovery should run.
68        // When `boot_counter` is 0, `ConsensusAuthority` will initiate the process of amnesia recovery if that's enabled in the parameters.
69        boot_counter: u64,
70    ) -> Self {
71        match network_type {
72            NetworkType::Tonic => {
73                let authority = AuthorityNode::start(
74                    epoch_start_timestamp_ms,
75                    own_index,
76                    committee,
77                    parameters,
78                    protocol_config,
79                    protocol_keypair,
80                    network_keypair,
81                    clock,
82                    transaction_verifier,
83                    commit_consumer,
84                    registry,
85                    boot_counter,
86                )
87                .await;
88                Self::WithTonic(authority)
89            }
90        }
91    }
92
93    pub async fn stop(self) {
94        match self {
95            Self::WithTonic(authority) => authority.stop().await,
96        }
97    }
98
99    pub fn update_peer_address(
100        &self,
101        network_pubkey: NetworkPublicKey,
102        address: Option<Multiaddr>,
103    ) {
104        match self {
105            Self::WithTonic(authority) => authority.update_peer_address(network_pubkey, address),
106        }
107    }
108
109    pub fn transaction_client(&self) -> Arc<TransactionClient> {
110        match self {
111            Self::WithTonic(authority) => authority.transaction_client(),
112        }
113    }
114
115    #[cfg(test)]
116    fn context(&self) -> &Arc<Context> {
117        match self {
118            Self::WithTonic(authority) => &authority.context,
119        }
120    }
121}
122
123#[derive(Clone, Copy, PartialEq, Eq, Debug)]
124pub enum NetworkType {
125    Tonic,
126}
127
128pub(crate) struct AuthorityNode<N>
129where
130    N: NetworkManager<AuthorityService<ChannelCoreThreadDispatcher>>,
131{
132    context: Arc<Context>,
133    start_time: Instant,
134    transaction_client: Arc<TransactionClient>,
135    synchronizer: Arc<SynchronizerHandle>,
136
137    commit_syncer_handle: CommitSyncerHandle,
138    round_prober_handle: RoundProberHandle,
139    proposed_block_handler: JoinHandle<()>,
140    leader_timeout_handle: LeaderTimeoutTaskHandle,
141    core_thread_handle: CoreThreadHandle,
142    subscriber: Subscriber<N::Client, AuthorityService<ChannelCoreThreadDispatcher>>,
143    network_manager: N,
144}
145
146impl<N> AuthorityNode<N>
147where
148    N: NetworkManager<AuthorityService<ChannelCoreThreadDispatcher>>,
149{
150    // See comments above ConsensusAuthority::start() for details on the input.
151    pub(crate) async fn start(
152        epoch_start_timestamp_ms: u64,
153        own_index: AuthorityIndex,
154        committee: Committee,
155        parameters: Parameters,
156        protocol_config: ProtocolConfig,
157        protocol_keypair: ProtocolKeyPair,
158        network_keypair: NetworkKeyPair,
159        clock: Arc<Clock>,
160        transaction_verifier: Arc<dyn TransactionVerifier>,
161        commit_consumer: CommitConsumerArgs,
162        registry: Registry,
163        boot_counter: u64,
164    ) -> Self {
165        assert!(
166            committee.is_valid_index(own_index),
167            "Invalid own index {}",
168            own_index
169        );
170        let own_hostname = committee.authority(own_index).hostname.clone();
171        info!(
172            "Starting consensus authority {} {}, {:?}, epoch start timestamp {}, boot counter {}, replaying after commit index {}, consumer last processed commit index {}",
173            own_index,
174            own_hostname,
175            protocol_config.version,
176            epoch_start_timestamp_ms,
177            boot_counter,
178            commit_consumer.replay_after_commit_index,
179            commit_consumer.consumer_last_processed_commit_index
180        );
181        info!(
182            "Consensus authorities: {}",
183            committee
184                .authorities()
185                .map(|(i, a)| format!("{}: {}", i, a.hostname))
186                .join(", ")
187        );
188        info!("Consensus parameters: {:?}", parameters);
189        info!("Consensus committee: {:?}", committee);
190        let context = Arc::new(Context::new(
191            epoch_start_timestamp_ms,
192            own_index,
193            committee,
194            parameters,
195            protocol_config,
196            initialise_metrics(registry),
197            clock,
198        ));
199        let start_time = Instant::now();
200
201        context
202            .metrics
203            .node_metrics
204            .authority_index
205            .with_label_values(&[&own_hostname])
206            .set(context.own_index.value() as i64);
207        context
208            .metrics
209            .node_metrics
210            .protocol_version
211            .set(context.protocol_config.version.as_u64() as i64);
212
213        let (tx_client, tx_receiver) = TransactionClient::new(context.clone());
214        let tx_consumer = TransactionConsumer::new(tx_receiver, context.clone());
215
216        let (core_signals, signals_receivers) = CoreSignals::new(context.clone());
217
218        let mut network_manager = N::new(context.clone(), network_keypair);
219        let network_client = network_manager.client();
220
221        let store_path = context.parameters.db_path.as_path().to_str().unwrap();
222        let store = Arc::new(RocksDBStore::new(store_path));
223        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
224
225        let block_verifier = Arc::new(SignedBlockVerifier::new(
226            context.clone(),
227            transaction_verifier,
228        ));
229
230        let transaction_certifier = TransactionCertifier::new(
231            context.clone(),
232            block_verifier.clone(),
233            dag_state.clone(),
234            commit_consumer.block_sender.clone(),
235        );
236
237        let mut proposed_block_handler = ProposedBlockHandler::new(
238            context.clone(),
239            signals_receivers.block_broadcast_receiver(),
240            transaction_certifier.clone(),
241        );
242
243        let proposed_block_handler =
244            spawn_logged_monitored_task!(proposed_block_handler.run(), "proposed_block_handler");
245
246        let sync_last_known_own_block = boot_counter == 0
247            && !context
248                .parameters
249                .sync_last_known_own_block_timeout
250                .is_zero();
251        info!(
252            "Sync last known own block: {}. Boot count: {}. Timeout: {:?}.",
253            sync_last_known_own_block,
254            boot_counter,
255            context.parameters.sync_last_known_own_block_timeout
256        );
257
258        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
259
260        let leader_schedule = Arc::new(LeaderSchedule::from_store(
261            context.clone(),
262            dag_state.clone(),
263        ));
264
265        let commit_consumer_monitor = commit_consumer.monitor();
266        let commit_observer = CommitObserver::new(
267            context.clone(),
268            commit_consumer,
269            dag_state.clone(),
270            transaction_certifier.clone(),
271            leader_schedule.clone(),
272        )
273        .await;
274
275        let initial_received_rounds = dag_state
276            .read()
277            .get_last_cached_block_per_authority(Round::MAX)
278            .into_iter()
279            .map(|(block, _)| block.round())
280            .collect::<Vec<_>>();
281        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(
282            context.clone(),
283            initial_received_rounds,
284        )));
285
286        // To avoid accidentally leaking the private key, the protocol key pair should only be
287        // kept in Core.
288        let core = Core::new(
289            context.clone(),
290            leader_schedule,
291            tx_consumer,
292            transaction_certifier.clone(),
293            block_manager,
294            commit_observer,
295            core_signals,
296            protocol_keypair,
297            dag_state.clone(),
298            sync_last_known_own_block,
299            round_tracker.clone(),
300        );
301
302        let (core_dispatcher, core_thread_handle) =
303            ChannelCoreThreadDispatcher::start(context.clone(), &dag_state, core);
304        let core_dispatcher = Arc::new(core_dispatcher);
305        let leader_timeout_handle =
306            LeaderTimeoutTask::start(core_dispatcher.clone(), &signals_receivers, context.clone());
307
308        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
309
310        let synchronizer = Synchronizer::start(
311            network_client.clone(),
312            context.clone(),
313            core_dispatcher.clone(),
314            commit_vote_monitor.clone(),
315            block_verifier.clone(),
316            transaction_certifier.clone(),
317            round_tracker.clone(),
318            dag_state.clone(),
319            sync_last_known_own_block,
320        );
321
322        let commit_syncer_handle = CommitSyncer::new(
323            context.clone(),
324            core_dispatcher.clone(),
325            commit_vote_monitor.clone(),
326            commit_consumer_monitor.clone(),
327            block_verifier.clone(),
328            transaction_certifier.clone(),
329            round_tracker.clone(),
330            network_client.clone(),
331            dag_state.clone(),
332        )
333        .start();
334
335        let round_prober_handle = RoundProber::new(
336            context.clone(),
337            core_dispatcher.clone(),
338            round_tracker.clone(),
339            dag_state.clone(),
340            network_client.clone(),
341        )
342        .start();
343
344        let network_service = Arc::new(AuthorityService::new(
345            context.clone(),
346            block_verifier,
347            commit_vote_monitor,
348            round_tracker.clone(),
349            synchronizer.clone(),
350            core_dispatcher,
351            signals_receivers.block_broadcast_receiver(),
352            transaction_certifier,
353            dag_state.clone(),
354            store,
355        ));
356
357        let subscriber = {
358            let s = Subscriber::new(
359                context.clone(),
360                network_client,
361                network_service.clone(),
362                dag_state,
363            );
364            for (peer, _) in context.committee.authorities() {
365                if peer != context.own_index {
366                    s.subscribe(peer);
367                }
368            }
369            s
370        };
371
372        network_manager.install_service(network_service).await;
373
374        info!(
375            "Consensus authority started, took {:?}",
376            start_time.elapsed()
377        );
378
379        Self {
380            context,
381            start_time,
382            transaction_client: Arc::new(tx_client),
383            synchronizer,
384            commit_syncer_handle,
385            round_prober_handle,
386            proposed_block_handler,
387            leader_timeout_handle,
388            core_thread_handle,
389            subscriber,
390            network_manager,
391        }
392    }
393
394    pub(crate) async fn stop(mut self) {
395        info!(
396            "Stopping authority. Total run time: {:?}",
397            self.start_time.elapsed()
398        );
399
400        // First shutdown components calling into Core.
401        if let Err(e) = self.synchronizer.stop().await {
402            if e.is_panic() {
403                std::panic::resume_unwind(e.into_panic());
404            }
405            warn!(
406                "Failed to stop synchronizer when shutting down consensus: {:?}",
407                e
408            );
409        };
410        self.commit_syncer_handle.stop().await;
411        self.round_prober_handle.stop().await;
412        self.proposed_block_handler.abort();
413        self.leader_timeout_handle.stop().await;
414        // Shutdown Core to stop block productions and broadcast.
415        self.core_thread_handle.stop().await;
416        // Stop block subscriptions before stopping network server.
417        self.subscriber.stop();
418        self.network_manager.stop().await;
419
420        self.context
421            .metrics
422            .node_metrics
423            .uptime
424            .observe(self.start_time.elapsed().as_secs_f64());
425    }
426
427    pub(crate) fn transaction_client(&self) -> Arc<TransactionClient> {
428        self.transaction_client.clone()
429    }
430
431    pub(crate) fn update_peer_address(
432        &self,
433        network_pubkey: NetworkPublicKey,
434        address: Option<Multiaddr>,
435    ) {
436        // Find the peer index for this network key
437        let Some(peer) = self
438            .context
439            .committee
440            .authorities()
441            .find(|(_, authority)| authority.network_key == network_pubkey)
442            .map(|(index, _)| index)
443        else {
444            warn!(
445                "Network public key {:?} not found in committee, ignoring address update",
446                network_pubkey
447            );
448            return;
449        };
450
451        // Update the address in the network manager
452        self.network_manager.update_peer_address(peer, address);
453
454        // Re-subscribe to the peer to force reconnection with new address
455        if peer != self.context.own_index {
456            info!("Re-subscribing to peer {} after address update", peer);
457            self.subscriber.subscribe(peer);
458        }
459    }
460}
461
462#[cfg(test)]
463mod tests {
464    #![allow(non_snake_case)]
465
466    use std::{
467        collections::{BTreeMap, BTreeSet},
468        sync::Arc,
469        time::Duration,
470    };
471
472    use consensus_config::{Parameters, local_committee_and_keys};
473    use mysten_metrics::RegistryService;
474    use mysten_metrics::monitored_mpsc::UnboundedReceiver;
475    use prometheus::Registry;
476    use rstest::rstest;
477    use sui_protocol_config::ProtocolConfig;
478    use tempfile::TempDir;
479    use tokio::time::{sleep, timeout};
480    use typed_store::DBMetrics;
481
482    use super::*;
483    use crate::{
484        CommittedSubDag,
485        block::{BlockAPI as _, GENESIS_ROUND},
486        transaction::NoopTransactionVerifier,
487    };
488
489    #[rstest]
490    #[tokio::test]
491    async fn test_authority_start_and_stop(
492        #[values(NetworkType::Tonic)] network_type: NetworkType,
493    ) {
494        let (committee, keypairs) = local_committee_and_keys(0, vec![1]);
495        let registry = Registry::new();
496
497        let temp_dir = TempDir::new().unwrap();
498        let parameters = Parameters {
499            db_path: temp_dir.keep(),
500            ..Default::default()
501        };
502        let txn_verifier = NoopTransactionVerifier {};
503
504        let own_index = committee.to_authority_index(0).unwrap();
505        let protocol_keypair = keypairs[own_index].1.clone();
506        let network_keypair = keypairs[own_index].0.clone();
507
508        let (commit_consumer, _) = CommitConsumerArgs::new(0, 0);
509
510        let authority = ConsensusAuthority::start(
511            network_type,
512            0,
513            own_index,
514            committee,
515            parameters,
516            ProtocolConfig::get_for_max_version_UNSAFE(),
517            protocol_keypair,
518            network_keypair,
519            Arc::new(Clock::default()),
520            Arc::new(txn_verifier),
521            commit_consumer,
522            registry,
523            0,
524        )
525        .await;
526
527        assert_eq!(authority.context().own_index, own_index);
528        assert_eq!(authority.context().committee.epoch(), 0);
529        assert_eq!(authority.context().committee.size(), 1);
530
531        authority.stop().await;
532    }
533
534    // TODO: build AuthorityFixture.
535    #[rstest]
536    #[tokio::test(flavor = "current_thread")]
537    async fn test_authority_committee(
538        #[values(NetworkType::Tonic)] network_type: NetworkType,
539        #[values(5, 10)] gc_depth: u32,
540    ) {
541        telemetry_subscribers::init_for_testing();
542        let db_registry = Registry::new();
543        DBMetrics::init(RegistryService::new(db_registry));
544
545        const NUM_OF_AUTHORITIES: usize = 4;
546        let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
547        let mut protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
548        protocol_config.set_consensus_gc_depth_for_testing(gc_depth);
549
550        let temp_dirs = (0..NUM_OF_AUTHORITIES)
551            .map(|_| TempDir::new().unwrap())
552            .collect::<Vec<_>>();
553
554        let mut commit_receivers = Vec::with_capacity(committee.size());
555        let mut authorities = Vec::with_capacity(committee.size());
556        let mut boot_counters = [0; NUM_OF_AUTHORITIES];
557
558        for (index, _authority_info) in committee.authorities() {
559            let (authority, commit_receiver) = make_authority(
560                index,
561                &temp_dirs[index.value()],
562                committee.clone(),
563                keypairs.clone(),
564                network_type,
565                boot_counters[index],
566                protocol_config.clone(),
567            )
568            .await;
569            boot_counters[index] += 1;
570            commit_receivers.push(commit_receiver);
571            authorities.push(authority);
572        }
573
574        const NUM_TRANSACTIONS: u8 = 15;
575        let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
576        for i in 0..NUM_TRANSACTIONS {
577            let txn = vec![i; 16];
578            submitted_transactions.insert(txn.clone());
579            authorities[i as usize % authorities.len()]
580                .transaction_client()
581                .submit(vec![txn])
582                .await
583                .unwrap();
584        }
585
586        for receiver in &mut commit_receivers {
587            let mut expected_transactions = submitted_transactions.clone();
588            loop {
589                let committed_subdag =
590                    tokio::time::timeout(Duration::from_secs(1), receiver.recv())
591                        .await
592                        .unwrap()
593                        .unwrap();
594                for b in committed_subdag.blocks {
595                    for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
596                        assert!(
597                            expected_transactions.remove(&txn),
598                            "Transaction not submitted or already seen: {:?}",
599                            txn
600                        );
601                    }
602                }
603                assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
604                if expected_transactions.is_empty() {
605                    break;
606                }
607            }
608        }
609
610        // Stop authority 1.
611        let index = committee.to_authority_index(1).unwrap();
612        authorities.remove(index.value()).stop().await;
613        sleep(Duration::from_secs(10)).await;
614
615        // Restart authority 1 and let it run.
616        let (authority, commit_receiver) = make_authority(
617            index,
618            &temp_dirs[index.value()],
619            committee.clone(),
620            keypairs.clone(),
621            network_type,
622            boot_counters[index],
623            protocol_config.clone(),
624        )
625        .await;
626        boot_counters[index] += 1;
627        commit_receivers[index] = commit_receiver;
628        authorities.insert(index.value(), authority);
629        sleep(Duration::from_secs(10)).await;
630
631        // Stop all authorities and exit.
632        for authority in authorities {
633            authority.stop().await;
634        }
635    }
636
637    #[rstest]
638    #[tokio::test(flavor = "current_thread")]
639    async fn test_small_committee(
640        #[values(NetworkType::Tonic)] network_type: NetworkType,
641        #[values(1, 2, 3)] num_authorities: usize,
642    ) {
643        telemetry_subscribers::init_for_testing();
644        let db_registry = Registry::new();
645        DBMetrics::init(RegistryService::new(db_registry));
646
647        let (committee, keypairs) = local_committee_and_keys(0, vec![1; num_authorities]);
648        let protocol_config: ProtocolConfig = ProtocolConfig::get_for_max_version_UNSAFE();
649
650        let temp_dirs = (0..num_authorities)
651            .map(|_| TempDir::new().unwrap())
652            .collect::<Vec<_>>();
653
654        let mut output_receivers = Vec::with_capacity(committee.size());
655        let mut authorities: Vec<ConsensusAuthority> = Vec::with_capacity(committee.size());
656        let mut boot_counters = vec![0; num_authorities];
657
658        for (index, _authority_info) in committee.authorities() {
659            let (authority, commit_receiver) = make_authority(
660                index,
661                &temp_dirs[index.value()],
662                committee.clone(),
663                keypairs.clone(),
664                network_type,
665                boot_counters[index],
666                protocol_config.clone(),
667            )
668            .await;
669            boot_counters[index] += 1;
670            output_receivers.push(commit_receiver);
671            authorities.push(authority);
672        }
673
674        const NUM_TRANSACTIONS: u8 = 15;
675        let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
676        for i in 0..NUM_TRANSACTIONS {
677            let txn = vec![i; 16];
678            submitted_transactions.insert(txn.clone());
679            authorities[i as usize % authorities.len()]
680                .transaction_client()
681                .submit(vec![txn])
682                .await
683                .unwrap();
684        }
685
686        for receiver in &mut output_receivers {
687            let mut expected_transactions = submitted_transactions.clone();
688            loop {
689                let committed_subdag =
690                    tokio::time::timeout(Duration::from_secs(1), receiver.recv())
691                        .await
692                        .unwrap()
693                        .unwrap();
694                for b in committed_subdag.blocks {
695                    for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
696                        assert!(
697                            expected_transactions.remove(&txn),
698                            "Transaction not submitted or already seen: {:?}",
699                            txn
700                        );
701                    }
702                }
703                assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
704                if expected_transactions.is_empty() {
705                    break;
706                }
707            }
708        }
709
710        // Stop authority 0.
711        let index = committee.to_authority_index(0).unwrap();
712        authorities.remove(index.value()).stop().await;
713        sleep(Duration::from_secs(10)).await;
714
715        // Restart authority 0 and let it run.
716        let (authority, commit_receiver) = make_authority(
717            index,
718            &temp_dirs[index.value()],
719            committee.clone(),
720            keypairs.clone(),
721            network_type,
722            boot_counters[index],
723            protocol_config.clone(),
724        )
725        .await;
726        boot_counters[index] += 1;
727        output_receivers[index] = commit_receiver;
728        authorities.insert(index.value(), authority);
729        sleep(Duration::from_secs(10)).await;
730
731        // Stop all authorities and exit.
732        for authority in authorities {
733            authority.stop().await;
734        }
735    }
736
737    #[rstest]
738    #[tokio::test(flavor = "current_thread")]
739    async fn test_amnesia_recovery_success(#[values(5, 10)] gc_depth: u32) {
740        telemetry_subscribers::init_for_testing();
741        let db_registry = Registry::new();
742        DBMetrics::init(RegistryService::new(db_registry));
743
744        const NUM_OF_AUTHORITIES: usize = 4;
745        let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
746        let mut commit_receivers = vec![];
747        let mut authorities = BTreeMap::new();
748        let mut temp_dirs = BTreeMap::new();
749        let mut boot_counters = [0; NUM_OF_AUTHORITIES];
750
751        let mut protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
752        protocol_config.set_consensus_gc_depth_for_testing(gc_depth);
753
754        for (index, _authority_info) in committee.authorities() {
755            let dir = TempDir::new().unwrap();
756            let (authority, commit_receiver) = make_authority(
757                index,
758                &dir,
759                committee.clone(),
760                keypairs.clone(),
761                NetworkType::Tonic,
762                boot_counters[index],
763                protocol_config.clone(),
764            )
765            .await;
766            boot_counters[index] += 1;
767            commit_receivers.push(commit_receiver);
768            authorities.insert(index, authority);
769            temp_dirs.insert(index, dir);
770        }
771
772        // Now we take the receiver of authority 1 and we wait until we see at least one block committed from this authority
773        // We wait until we see at least one committed block authored from this authority. That way we'll be 100% sure that
774        // at least one block has been proposed and successfully received by a quorum of nodes.
775        let index_1 = committee.to_authority_index(1).unwrap();
776        'outer: while let Some(result) =
777            timeout(Duration::from_secs(10), commit_receivers[index_1].recv())
778                .await
779                .expect("Timed out while waiting for at least one committed block from authority 1")
780        {
781            for block in result.blocks {
782                if block.round() > GENESIS_ROUND && block.author() == index_1 {
783                    break 'outer;
784                }
785            }
786        }
787
788        // Stop authority 1 & 2.
789        // * Authority 1 will be used to wipe out their DB and practically "force" the amnesia recovery.
790        // * Authority 2 is stopped in order to simulate less than f+1 availability which will
791        // make authority 1 retry during amnesia recovery until it has finally managed to successfully get back f+1 responses.
792        // once authority 2 is up and running again.
793        authorities.remove(&index_1).unwrap().stop().await;
794        let index_2 = committee.to_authority_index(2).unwrap();
795        authorities.remove(&index_2).unwrap().stop().await;
796        sleep(Duration::from_secs(5)).await;
797
798        // Authority 1: create a new directory to simulate amnesia. The node will start having participated previously
799        // to consensus but now will attempt to synchronize the last own block and recover from there. It won't be able
800        // to do that successfully as authority 2 is still down.
801        let dir = TempDir::new().unwrap();
802        // We do reset the boot counter for this one to simulate a "binary" restart
803        boot_counters[index_1] = 0;
804        let (authority, mut commit_receiver) = make_authority(
805            index_1,
806            &dir,
807            committee.clone(),
808            keypairs.clone(),
809            NetworkType::Tonic,
810            boot_counters[index_1],
811            protocol_config.clone(),
812        )
813        .await;
814        boot_counters[index_1] += 1;
815        authorities.insert(index_1, authority);
816        temp_dirs.insert(index_1, dir);
817        sleep(Duration::from_secs(5)).await;
818
819        // Now spin up authority 2 using its earlier directly - so no amnesia recovery should be forced here.
820        // Authority 1 should be able to recover from amnesia successfully.
821        let (authority, _commit_receiver) = make_authority(
822            index_2,
823            &temp_dirs[&index_2],
824            committee.clone(),
825            keypairs,
826            NetworkType::Tonic,
827            boot_counters[index_2],
828            protocol_config.clone(),
829        )
830        .await;
831        boot_counters[index_2] += 1;
832        authorities.insert(index_2, authority);
833        sleep(Duration::from_secs(5)).await;
834
835        // We wait until we see at least one committed block authored from this authority
836        'outer: while let Some(result) = commit_receiver.recv().await {
837            for block in result.blocks {
838                if block.round() > GENESIS_ROUND && block.author() == index_1 {
839                    break 'outer;
840                }
841            }
842        }
843
844        // Stop all authorities and exit.
845        for (_, authority) in authorities {
846            authority.stop().await;
847        }
848    }
849
850    // TODO: create a fixture
851    async fn make_authority(
852        index: AuthorityIndex,
853        db_dir: &TempDir,
854        committee: Committee,
855        keypairs: Vec<(NetworkKeyPair, ProtocolKeyPair)>,
856        network_type: NetworkType,
857        boot_counter: u64,
858        protocol_config: ProtocolConfig,
859    ) -> (ConsensusAuthority, UnboundedReceiver<CommittedSubDag>) {
860        let registry = Registry::new();
861
862        // Cache less blocks to exercise commit sync.
863        let parameters = Parameters {
864            db_path: db_dir.path().to_path_buf(),
865            dag_state_cached_rounds: 5,
866            commit_sync_parallel_fetches: 2,
867            commit_sync_batch_size: 3,
868            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
869            ..Default::default()
870        };
871        let txn_verifier = NoopTransactionVerifier {};
872
873        let protocol_keypair = keypairs[index].1.clone();
874        let network_keypair = keypairs[index].0.clone();
875
876        let (commit_consumer, commit_receiver) = CommitConsumerArgs::new(0, 0);
877
878        let authority = ConsensusAuthority::start(
879            network_type,
880            0,
881            index,
882            committee,
883            parameters,
884            protocol_config,
885            protocol_keypair,
886            network_keypair,
887            Arc::new(Clock::default()),
888            Arc::new(txn_verifier),
889            commit_consumer,
890            registry,
891            boot_counter,
892        )
893        .await;
894
895        (authority, commit_receiver)
896    }
897}