consensus_core/
authority_node.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{sync::Arc, time::Instant};
5
6use consensus_config::{AuthorityIndex, Committee, NetworkKeyPair, Parameters, ProtocolKeyPair};
7use itertools::Itertools;
8use mysten_metrics::spawn_logged_monitored_task;
9use parking_lot::RwLock;
10use prometheus::Registry;
11use sui_protocol_config::{ConsensusNetwork, ProtocolConfig};
12use tokio::task::JoinHandle;
13use tracing::{info, warn};
14
15use crate::{
16    CommitConsumerArgs,
17    authority_service::AuthorityService,
18    block_manager::BlockManager,
19    block_verifier::SignedBlockVerifier,
20    broadcaster::Broadcaster,
21    commit_observer::CommitObserver,
22    commit_syncer::{CommitSyncer, CommitSyncerHandle},
23    commit_vote_monitor::CommitVoteMonitor,
24    context::{Clock, Context},
25    core::{Core, CoreSignals},
26    core_thread::{ChannelCoreThreadDispatcher, CoreThreadHandle},
27    dag_state::DagState,
28    leader_schedule::LeaderSchedule,
29    leader_timeout::{LeaderTimeoutTask, LeaderTimeoutTaskHandle},
30    metrics::initialise_metrics,
31    network::{
32        NetworkClient as _, NetworkManager, anemo_network::AnemoManager,
33        tonic_network::TonicManager,
34    },
35    proposed_block_handler::ProposedBlockHandler,
36    round_prober::{RoundProber, RoundProberHandle},
37    round_tracker::PeerRoundTracker,
38    storage::rocksdb_store::RocksDBStore,
39    subscriber::Subscriber,
40    synchronizer::{Synchronizer, SynchronizerHandle},
41    transaction::{TransactionClient, TransactionConsumer, TransactionVerifier},
42    transaction_certifier::TransactionCertifier,
43};
44
45/// ConsensusAuthority is used by Sui to manage the lifetime of AuthorityNode.
46/// It hides the details of the implementation from the caller, MysticetiManager.
47#[allow(private_interfaces)]
48pub enum ConsensusAuthority {
49    WithAnemo(AuthorityNode<AnemoManager>),
50    WithTonic(AuthorityNode<TonicManager>),
51}
52
53impl ConsensusAuthority {
54    pub async fn start(
55        network_type: ConsensusNetwork,
56        epoch_start_timestamp_ms: u64,
57        own_index: AuthorityIndex,
58        committee: Committee,
59        parameters: Parameters,
60        protocol_config: ProtocolConfig,
61        protocol_keypair: ProtocolKeyPair,
62        network_keypair: NetworkKeyPair,
63        clock: Arc<Clock>,
64        transaction_verifier: Arc<dyn TransactionVerifier>,
65        commit_consumer: CommitConsumerArgs,
66        registry: Registry,
67        // A counter that keeps track of how many times the authority node has been booted while the binary
68        // or the component that is calling the `ConsensusAuthority` has been running. It's mostly useful to
69        // make decisions on whether amnesia recovery should run or not. When `boot_counter` is 0, then `ConsensusAuthority`
70        // will initiate the process of amnesia recovery if that's enabled in the parameters.
71        boot_counter: u64,
72    ) -> Self {
73        match network_type {
74            ConsensusNetwork::Anemo => {
75                let authority = AuthorityNode::start(
76                    epoch_start_timestamp_ms,
77                    own_index,
78                    committee,
79                    parameters,
80                    protocol_config,
81                    protocol_keypair,
82                    network_keypair,
83                    clock,
84                    transaction_verifier,
85                    commit_consumer,
86                    registry,
87                    boot_counter,
88                )
89                .await;
90                Self::WithAnemo(authority)
91            }
92            ConsensusNetwork::Tonic => {
93                let authority = AuthorityNode::start(
94                    epoch_start_timestamp_ms,
95                    own_index,
96                    committee,
97                    parameters,
98                    protocol_config,
99                    protocol_keypair,
100                    network_keypair,
101                    clock,
102                    transaction_verifier,
103                    commit_consumer,
104                    registry,
105                    boot_counter,
106                )
107                .await;
108                Self::WithTonic(authority)
109            }
110        }
111    }
112
113    pub async fn stop(self) {
114        match self {
115            Self::WithAnemo(authority) => authority.stop().await,
116            Self::WithTonic(authority) => authority.stop().await,
117        }
118    }
119
120    pub fn transaction_client(&self) -> Arc<TransactionClient> {
121        match self {
122            Self::WithAnemo(authority) => authority.transaction_client(),
123            Self::WithTonic(authority) => authority.transaction_client(),
124        }
125    }
126
127    #[cfg(test)]
128    fn context(&self) -> &Arc<Context> {
129        match self {
130            Self::WithAnemo(authority) => &authority.context,
131            Self::WithTonic(authority) => &authority.context,
132        }
133    }
134
135    #[allow(unused)]
136    fn sync_last_known_own_block_enabled(&self) -> bool {
137        match self {
138            Self::WithAnemo(authority) => authority.sync_last_known_own_block,
139            Self::WithTonic(authority) => authority.sync_last_known_own_block,
140        }
141    }
142}
143
144pub(crate) struct AuthorityNode<N>
145where
146    N: NetworkManager<AuthorityService<ChannelCoreThreadDispatcher>>,
147{
148    context: Arc<Context>,
149    start_time: Instant,
150    transaction_client: Arc<TransactionClient>,
151    synchronizer: Arc<SynchronizerHandle>,
152
153    commit_syncer_handle: CommitSyncerHandle,
154    round_prober_handle: RoundProberHandle,
155    proposed_block_handler: JoinHandle<()>,
156    leader_timeout_handle: LeaderTimeoutTaskHandle,
157    core_thread_handle: CoreThreadHandle,
158    // Only one of broadcaster and subscriber gets created, depending on
159    // if streaming is supported.
160    broadcaster: Option<Broadcaster>,
161    subscriber: Option<Subscriber<N::Client, AuthorityService<ChannelCoreThreadDispatcher>>>,
162    network_manager: N,
163    sync_last_known_own_block: bool,
164}
165
166impl<N> AuthorityNode<N>
167where
168    N: NetworkManager<AuthorityService<ChannelCoreThreadDispatcher>>,
169{
170    pub(crate) async fn start(
171        epoch_start_timestamp_ms: u64,
172        own_index: AuthorityIndex,
173        committee: Committee,
174        parameters: Parameters,
175        protocol_config: ProtocolConfig,
176        // To avoid accidentally leaking the private key, the protocol key pair should only be
177        // kept in Core.
178        protocol_keypair: ProtocolKeyPair,
179        network_keypair: NetworkKeyPair,
180        clock: Arc<Clock>,
181        transaction_verifier: Arc<dyn TransactionVerifier>,
182        commit_consumer: CommitConsumerArgs,
183        registry: Registry,
184        boot_counter: u64,
185    ) -> Self {
186        assert!(
187            committee.is_valid_index(own_index),
188            "Invalid own index {}",
189            own_index
190        );
191        let own_hostname = committee.authority(own_index).hostname.clone();
192        info!(
193            "Starting consensus authority {} {}, {:?}, epoch start timestamp {}, boot counter {}, replaying after commit index {}, consumer last processed commit index {}",
194            own_index,
195            own_hostname,
196            protocol_config.version,
197            epoch_start_timestamp_ms,
198            boot_counter,
199            commit_consumer.replay_after_commit_index,
200            commit_consumer.consumer_last_processed_commit_index
201        );
202        info!(
203            "Consensus authorities: {}",
204            committee
205                .authorities()
206                .map(|(i, a)| format!("{}: {}", i, a.hostname))
207                .join(", ")
208        );
209        info!("Consensus parameters: {:?}", parameters);
210        info!("Consensus committee: {:?}", committee);
211        let context = Arc::new(Context::new(
212            epoch_start_timestamp_ms,
213            own_index,
214            committee,
215            parameters,
216            protocol_config,
217            initialise_metrics(registry),
218            clock,
219        ));
220        let start_time = Instant::now();
221
222        context
223            .metrics
224            .node_metrics
225            .authority_index
226            .with_label_values(&[&own_hostname])
227            .set(context.own_index.value() as i64);
228        context
229            .metrics
230            .node_metrics
231            .protocol_version
232            .set(context.protocol_config.version.as_u64() as i64);
233
234        let (tx_client, tx_receiver) = TransactionClient::new(context.clone());
235        let tx_consumer = TransactionConsumer::new(tx_receiver, context.clone());
236
237        let (core_signals, signals_receivers) = CoreSignals::new(context.clone());
238
239        let mut network_manager = N::new(context.clone(), network_keypair);
240        let network_client = network_manager.client();
241
242        // REQUIRED: Broadcaster must be created before Core, to start listening on the
243        // broadcast channel in order to not miss blocks and cause test failures.
244        let broadcaster = if N::Client::SUPPORT_STREAMING {
245            None
246        } else {
247            Some(Broadcaster::new(
248                context.clone(),
249                network_client.clone(),
250                &signals_receivers,
251            ))
252        };
253
254        let store_path = context.parameters.db_path.as_path().to_str().unwrap();
255        let store = Arc::new(RocksDBStore::new(store_path));
256        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
257
258        let block_verifier = Arc::new(SignedBlockVerifier::new(
259            context.clone(),
260            transaction_verifier,
261        ));
262
263        let transaction_certifier = TransactionCertifier::new(
264            context.clone(),
265            block_verifier.clone(),
266            dag_state.clone(),
267            commit_consumer.block_sender.clone(),
268        );
269
270        let mut proposed_block_handler = ProposedBlockHandler::new(
271            context.clone(),
272            signals_receivers.block_broadcast_receiver(),
273            transaction_certifier.clone(),
274        );
275
276        let proposed_block_handler =
277            spawn_logged_monitored_task!(proposed_block_handler.run(), "proposed_block_handler");
278
279        let sync_last_known_own_block = boot_counter == 0
280            && dag_state.read().highest_accepted_round() == 0
281            && !context
282                .parameters
283                .sync_last_known_own_block_timeout
284                .is_zero();
285        info!("Sync last known own block: {sync_last_known_own_block}");
286
287        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
288
289        let leader_schedule = Arc::new(LeaderSchedule::from_store(
290            context.clone(),
291            dag_state.clone(),
292        ));
293
294        let commit_consumer_monitor = commit_consumer.monitor();
295        let commit_observer = CommitObserver::new(
296            context.clone(),
297            commit_consumer,
298            dag_state.clone(),
299            transaction_certifier.clone(),
300            leader_schedule.clone(),
301        )
302        .await;
303
304        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
305
306        let core = Core::new(
307            context.clone(),
308            leader_schedule,
309            tx_consumer,
310            transaction_certifier.clone(),
311            block_manager,
312            // For streaming RPC, Core will be notified when consumer is available.
313            // For non-streaming RPC, there is no way to know so default to true.
314            // When there is only one (this) authority, assume subscriber exists.
315            !N::Client::SUPPORT_STREAMING || context.committee.size() == 1,
316            commit_observer,
317            core_signals,
318            protocol_keypair,
319            dag_state.clone(),
320            sync_last_known_own_block,
321            round_tracker.clone(),
322        );
323
324        let (core_dispatcher, core_thread_handle) =
325            ChannelCoreThreadDispatcher::start(context.clone(), &dag_state, core);
326        let core_dispatcher = Arc::new(core_dispatcher);
327        let leader_timeout_handle =
328            LeaderTimeoutTask::start(core_dispatcher.clone(), &signals_receivers, context.clone());
329
330        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
331
332        let synchronizer = Synchronizer::start(
333            network_client.clone(),
334            context.clone(),
335            core_dispatcher.clone(),
336            commit_vote_monitor.clone(),
337            block_verifier.clone(),
338            transaction_certifier.clone(),
339            dag_state.clone(),
340            sync_last_known_own_block,
341        );
342
343        let commit_syncer_handle = CommitSyncer::new(
344            context.clone(),
345            core_dispatcher.clone(),
346            commit_vote_monitor.clone(),
347            commit_consumer_monitor.clone(),
348            block_verifier.clone(),
349            transaction_certifier.clone(),
350            network_client.clone(),
351            dag_state.clone(),
352        )
353        .start();
354
355        let round_prober_handle = RoundProber::new(
356            context.clone(),
357            core_dispatcher.clone(),
358            round_tracker.clone(),
359            dag_state.clone(),
360            network_client.clone(),
361        )
362        .start();
363
364        let network_service = Arc::new(AuthorityService::new(
365            context.clone(),
366            block_verifier,
367            commit_vote_monitor,
368            round_tracker.clone(),
369            synchronizer.clone(),
370            core_dispatcher,
371            signals_receivers.block_broadcast_receiver(),
372            transaction_certifier,
373            dag_state.clone(),
374            store,
375        ));
376
377        let subscriber = if N::Client::SUPPORT_STREAMING {
378            let s = Subscriber::new(
379                context.clone(),
380                network_client,
381                network_service.clone(),
382                dag_state,
383            );
384            for (peer, _) in context.committee.authorities() {
385                if peer != context.own_index {
386                    s.subscribe(peer);
387                }
388            }
389            Some(s)
390        } else {
391            None
392        };
393
394        network_manager.install_service(network_service).await;
395
396        info!(
397            "Consensus authority started, took {:?}",
398            start_time.elapsed()
399        );
400
401        Self {
402            context,
403            start_time,
404            transaction_client: Arc::new(tx_client),
405            synchronizer,
406            commit_syncer_handle,
407            round_prober_handle,
408            proposed_block_handler,
409            leader_timeout_handle,
410            core_thread_handle,
411            broadcaster,
412            subscriber,
413            network_manager,
414            sync_last_known_own_block,
415        }
416    }
417
418    pub(crate) async fn stop(mut self) {
419        info!(
420            "Stopping authority. Total run time: {:?}",
421            self.start_time.elapsed()
422        );
423
424        // First shutdown components calling into Core.
425        if let Err(e) = self.synchronizer.stop().await {
426            if e.is_panic() {
427                std::panic::resume_unwind(e.into_panic());
428            }
429            warn!(
430                "Failed to stop synchronizer when shutting down consensus: {:?}",
431                e
432            );
433        };
434        self.commit_syncer_handle.stop().await;
435        self.round_prober_handle.stop().await;
436        self.proposed_block_handler.abort();
437        self.leader_timeout_handle.stop().await;
438        // Shutdown Core to stop block productions and broadcast.
439        // When using streaming, all subscribers to broadcasted blocks stop after this.
440        self.core_thread_handle.stop().await;
441        if let Some(mut broadcaster) = self.broadcaster.take() {
442            broadcaster.stop();
443        }
444        // Stop outgoing long lived streams before stopping network server.
445        if let Some(subscriber) = self.subscriber.take() {
446            subscriber.stop();
447        }
448        self.network_manager.stop().await;
449
450        self.context
451            .metrics
452            .node_metrics
453            .uptime
454            .observe(self.start_time.elapsed().as_secs_f64());
455    }
456
457    pub(crate) fn transaction_client(&self) -> Arc<TransactionClient> {
458        self.transaction_client.clone()
459    }
460}
461
462#[cfg(test)]
463mod tests {
464    #![allow(non_snake_case)]
465
466    use std::{
467        collections::{BTreeMap, BTreeSet},
468        sync::Arc,
469        time::Duration,
470    };
471
472    use consensus_config::{Parameters, local_committee_and_keys};
473    use mysten_metrics::RegistryService;
474    use mysten_metrics::monitored_mpsc::UnboundedReceiver;
475    use prometheus::Registry;
476    use rstest::rstest;
477    use sui_protocol_config::ProtocolConfig;
478    use tempfile::TempDir;
479    use tokio::time::{sleep, timeout};
480    use typed_store::DBMetrics;
481
482    use super::*;
483    use crate::{
484        CommittedSubDag,
485        block::{BlockAPI as _, CertifiedBlocksOutput, GENESIS_ROUND},
486        transaction::NoopTransactionVerifier,
487    };
488
489    #[rstest]
490    #[tokio::test]
491    async fn test_authority_start_and_stop(
492        #[values(ConsensusNetwork::Anemo, ConsensusNetwork::Tonic)] network_type: ConsensusNetwork,
493    ) {
494        let (committee, keypairs) = local_committee_and_keys(0, vec![1]);
495        let registry = Registry::new();
496
497        let temp_dir = TempDir::new().unwrap();
498        let parameters = Parameters {
499            db_path: temp_dir.keep(),
500            ..Default::default()
501        };
502        let txn_verifier = NoopTransactionVerifier {};
503
504        let own_index = committee.to_authority_index(0).unwrap();
505        let protocol_keypair = keypairs[own_index].1.clone();
506        let network_keypair = keypairs[own_index].0.clone();
507
508        let (commit_consumer, _, _) = CommitConsumerArgs::new(0, 0);
509
510        let authority = ConsensusAuthority::start(
511            network_type,
512            0,
513            own_index,
514            committee,
515            parameters,
516            ProtocolConfig::get_for_max_version_UNSAFE(),
517            protocol_keypair,
518            network_keypair,
519            Arc::new(Clock::default()),
520            Arc::new(txn_verifier),
521            commit_consumer,
522            registry,
523            0,
524        )
525        .await;
526
527        assert_eq!(authority.context().own_index, own_index);
528        assert_eq!(authority.context().committee.epoch(), 0);
529        assert_eq!(authority.context().committee.size(), 1);
530
531        authority.stop().await;
532    }
533
534    // TODO: build AuthorityFixture.
535    #[rstest]
536    #[tokio::test(flavor = "current_thread")]
537    async fn test_authority_committee(
538        #[values(ConsensusNetwork::Anemo, ConsensusNetwork::Tonic)] network_type: ConsensusNetwork,
539        #[values(5, 10)] gc_depth: u32,
540    ) {
541        telemetry_subscribers::init_for_testing();
542        let db_registry = Registry::new();
543        DBMetrics::init(RegistryService::new(db_registry));
544
545        const NUM_OF_AUTHORITIES: usize = 4;
546        let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
547        let mut protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
548        protocol_config.set_consensus_gc_depth_for_testing(gc_depth);
549
550        let temp_dirs = (0..NUM_OF_AUTHORITIES)
551            .map(|_| TempDir::new().unwrap())
552            .collect::<Vec<_>>();
553
554        let mut commit_receivers = Vec::with_capacity(committee.size());
555        let mut block_receivers = Vec::with_capacity(committee.size());
556        let mut authorities = Vec::with_capacity(committee.size());
557        let mut boot_counters = [0; NUM_OF_AUTHORITIES];
558
559        for (index, _authority_info) in committee.authorities() {
560            let (authority, commit_receiver, block_receiver) = make_authority(
561                index,
562                &temp_dirs[index.value()],
563                committee.clone(),
564                keypairs.clone(),
565                network_type,
566                boot_counters[index],
567                protocol_config.clone(),
568            )
569            .await;
570            boot_counters[index] += 1;
571            commit_receivers.push(commit_receiver);
572            block_receivers.push(block_receiver);
573            authorities.push(authority);
574        }
575
576        const NUM_TRANSACTIONS: u8 = 15;
577        let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
578        for i in 0..NUM_TRANSACTIONS {
579            let txn = vec![i; 16];
580            submitted_transactions.insert(txn.clone());
581            authorities[i as usize % authorities.len()]
582                .transaction_client()
583                .submit(vec![txn])
584                .await
585                .unwrap();
586        }
587
588        for receiver in &mut commit_receivers {
589            let mut expected_transactions = submitted_transactions.clone();
590            loop {
591                let committed_subdag =
592                    tokio::time::timeout(Duration::from_secs(1), receiver.recv())
593                        .await
594                        .unwrap()
595                        .unwrap();
596                for b in committed_subdag.blocks {
597                    for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
598                        assert!(
599                            expected_transactions.remove(&txn),
600                            "Transaction not submitted or already seen: {:?}",
601                            txn
602                        );
603                    }
604                }
605                assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
606                if expected_transactions.is_empty() {
607                    break;
608                }
609            }
610        }
611
612        // Stop authority 1.
613        let index = committee.to_authority_index(1).unwrap();
614        authorities.remove(index.value()).stop().await;
615        sleep(Duration::from_secs(10)).await;
616
617        // Restart authority 1 and let it run.
618        let (authority, commit_receiver, block_receiver) = make_authority(
619            index,
620            &temp_dirs[index.value()],
621            committee.clone(),
622            keypairs.clone(),
623            network_type,
624            boot_counters[index],
625            protocol_config.clone(),
626        )
627        .await;
628        boot_counters[index] += 1;
629        commit_receivers[index] = commit_receiver;
630        block_receivers[index] = block_receiver;
631        authorities.insert(index.value(), authority);
632        sleep(Duration::from_secs(10)).await;
633
634        // Stop all authorities and exit.
635        for authority in authorities {
636            authority.stop().await;
637        }
638    }
639
640    #[rstest]
641    #[tokio::test(flavor = "current_thread")]
642    async fn test_small_committee(
643        #[values(ConsensusNetwork::Anemo, ConsensusNetwork::Tonic)] network_type: ConsensusNetwork,
644        #[values(1, 2, 3)] num_authorities: usize,
645    ) {
646        telemetry_subscribers::init_for_testing();
647        let db_registry = Registry::new();
648        DBMetrics::init(RegistryService::new(db_registry));
649
650        let (committee, keypairs) = local_committee_and_keys(0, vec![1; num_authorities]);
651        let protocol_config: ProtocolConfig = ProtocolConfig::get_for_max_version_UNSAFE();
652
653        let temp_dirs = (0..num_authorities)
654            .map(|_| TempDir::new().unwrap())
655            .collect::<Vec<_>>();
656
657        let mut output_receivers = Vec::with_capacity(committee.size());
658        let mut authorities: Vec<ConsensusAuthority> = Vec::with_capacity(committee.size());
659        let mut boot_counters = vec![0; num_authorities];
660
661        for (index, _authority_info) in committee.authorities() {
662            let (authority, commit_receiver, _block_receiver) = make_authority(
663                index,
664                &temp_dirs[index.value()],
665                committee.clone(),
666                keypairs.clone(),
667                network_type,
668                boot_counters[index],
669                protocol_config.clone(),
670            )
671            .await;
672            boot_counters[index] += 1;
673            output_receivers.push(commit_receiver);
674            authorities.push(authority);
675        }
676
677        const NUM_TRANSACTIONS: u8 = 15;
678        let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
679        for i in 0..NUM_TRANSACTIONS {
680            let txn = vec![i; 16];
681            submitted_transactions.insert(txn.clone());
682            authorities[i as usize % authorities.len()]
683                .transaction_client()
684                .submit(vec![txn])
685                .await
686                .unwrap();
687        }
688
689        for receiver in &mut output_receivers {
690            let mut expected_transactions = submitted_transactions.clone();
691            loop {
692                let committed_subdag =
693                    tokio::time::timeout(Duration::from_secs(1), receiver.recv())
694                        .await
695                        .unwrap()
696                        .unwrap();
697                for b in committed_subdag.blocks {
698                    for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
699                        assert!(
700                            expected_transactions.remove(&txn),
701                            "Transaction not submitted or already seen: {:?}",
702                            txn
703                        );
704                    }
705                }
706                assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
707                if expected_transactions.is_empty() {
708                    break;
709                }
710            }
711        }
712
713        // Stop authority 0.
714        let index = committee.to_authority_index(0).unwrap();
715        authorities.remove(index.value()).stop().await;
716        sleep(Duration::from_secs(10)).await;
717
718        // Restart authority 0 and let it run.
719        let (authority, commit_receiver, _block_receiver) = make_authority(
720            index,
721            &temp_dirs[index.value()],
722            committee.clone(),
723            keypairs.clone(),
724            network_type,
725            boot_counters[index],
726            protocol_config.clone(),
727        )
728        .await;
729        boot_counters[index] += 1;
730        output_receivers[index] = commit_receiver;
731        authorities.insert(index.value(), authority);
732        sleep(Duration::from_secs(10)).await;
733
734        // Stop all authorities and exit.
735        for authority in authorities {
736            authority.stop().await;
737        }
738    }
739
740    #[rstest]
741    #[tokio::test(flavor = "current_thread")]
742    async fn test_amnesia_recovery_success(#[values(5, 10)] gc_depth: u32) {
743        telemetry_subscribers::init_for_testing();
744        let db_registry = Registry::new();
745        DBMetrics::init(RegistryService::new(db_registry));
746
747        const NUM_OF_AUTHORITIES: usize = 4;
748        let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
749        let mut commit_receivers = vec![];
750        let mut block_receivers = vec![];
751        let mut authorities = BTreeMap::new();
752        let mut temp_dirs = BTreeMap::new();
753        let mut boot_counters = [0; NUM_OF_AUTHORITIES];
754
755        let mut protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
756        protocol_config.set_consensus_gc_depth_for_testing(gc_depth);
757
758        for (index, _authority_info) in committee.authorities() {
759            let dir = TempDir::new().unwrap();
760            let (authority, commit_receiver, block_receiver) = make_authority(
761                index,
762                &dir,
763                committee.clone(),
764                keypairs.clone(),
765                ConsensusNetwork::Tonic,
766                boot_counters[index],
767                protocol_config.clone(),
768            )
769            .await;
770            assert!(
771                authority.sync_last_known_own_block_enabled(),
772                "Expected syncing of last known own block to be enabled as all authorities are of empty db and boot for first time."
773            );
774            boot_counters[index] += 1;
775            commit_receivers.push(commit_receiver);
776            block_receivers.push(block_receiver);
777            authorities.insert(index, authority);
778            temp_dirs.insert(index, dir);
779        }
780
781        // Now we take the receiver of authority 1 and we wait until we see at least one block committed from this authority
782        // We wait until we see at least one committed block authored from this authority. That way we'll be 100% sure that
783        // at least one block has been proposed and successfully received by a quorum of nodes.
784        let index_1 = committee.to_authority_index(1).unwrap();
785        'outer: while let Some(result) =
786            timeout(Duration::from_secs(10), commit_receivers[index_1].recv())
787                .await
788                .expect("Timed out while waiting for at least one committed block from authority 1")
789        {
790            for block in result.blocks {
791                if block.round() > GENESIS_ROUND && block.author() == index_1 {
792                    break 'outer;
793                }
794            }
795        }
796
797        // Stop authority 1 & 2.
798        // * Authority 1 will be used to wipe out their DB and practically "force" the amnesia recovery.
799        // * Authority 2 is stopped in order to simulate less than f+1 availability which will
800        // make authority 1 retry during amnesia recovery until it has finally managed to successfully get back f+1 responses.
801        // once authority 2 is up and running again.
802        authorities.remove(&index_1).unwrap().stop().await;
803        let index_2 = committee.to_authority_index(2).unwrap();
804        authorities.remove(&index_2).unwrap().stop().await;
805        sleep(Duration::from_secs(5)).await;
806
807        // Authority 1: create a new directory to simulate amnesia. The node will start having participated previously
808        // to consensus but now will attempt to synchronize the last own block and recover from there. It won't be able
809        // to do that successfully as authority 2 is still down.
810        let dir = TempDir::new().unwrap();
811        // We do reset the boot counter for this one to simulate a "binary" restart
812        boot_counters[index_1] = 0;
813        let (authority, mut commit_receiver, _block_receiver) = make_authority(
814            index_1,
815            &dir,
816            committee.clone(),
817            keypairs.clone(),
818            ConsensusNetwork::Tonic,
819            boot_counters[index_1],
820            protocol_config.clone(),
821        )
822        .await;
823        assert!(
824            authority.sync_last_known_own_block_enabled(),
825            "Authority should have the sync of last own block enabled"
826        );
827        boot_counters[index_1] += 1;
828        authorities.insert(index_1, authority);
829        temp_dirs.insert(index_1, dir);
830        sleep(Duration::from_secs(5)).await;
831
832        // Now spin up authority 2 using its earlier directly - so no amnesia recovery should be forced here.
833        // Authority 1 should be able to recover from amnesia successfully.
834        let (authority, _commit_receiver, _block_receiver) = make_authority(
835            index_2,
836            &temp_dirs[&index_2],
837            committee.clone(),
838            keypairs,
839            ConsensusNetwork::Tonic,
840            boot_counters[index_2],
841            protocol_config.clone(),
842        )
843        .await;
844        assert!(
845            !authority.sync_last_known_own_block_enabled(),
846            "Authority should not have attempted to sync the last own block"
847        );
848        boot_counters[index_2] += 1;
849        authorities.insert(index_2, authority);
850        sleep(Duration::from_secs(5)).await;
851
852        // We wait until we see at least one committed block authored from this authority
853        'outer: while let Some(result) = commit_receiver.recv().await {
854            for block in result.blocks {
855                if block.round() > GENESIS_ROUND && block.author() == index_1 {
856                    break 'outer;
857                }
858            }
859        }
860
861        // Stop all authorities and exit.
862        for (_, authority) in authorities {
863            authority.stop().await;
864        }
865    }
866
867    // TODO: create a fixture
868    async fn make_authority(
869        index: AuthorityIndex,
870        db_dir: &TempDir,
871        committee: Committee,
872        keypairs: Vec<(NetworkKeyPair, ProtocolKeyPair)>,
873        network_type: ConsensusNetwork,
874        boot_counter: u64,
875        protocol_config: ProtocolConfig,
876    ) -> (
877        ConsensusAuthority,
878        UnboundedReceiver<CommittedSubDag>,
879        UnboundedReceiver<CertifiedBlocksOutput>,
880    ) {
881        let registry = Registry::new();
882
883        // Cache less blocks to exercise commit sync.
884        let parameters = Parameters {
885            db_path: db_dir.path().to_path_buf(),
886            dag_state_cached_rounds: 5,
887            commit_sync_parallel_fetches: 2,
888            commit_sync_batch_size: 3,
889            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
890            ..Default::default()
891        };
892        let txn_verifier = NoopTransactionVerifier {};
893
894        let protocol_keypair = keypairs[index].1.clone();
895        let network_keypair = keypairs[index].0.clone();
896
897        let (commit_consumer, commit_receiver, block_receiver) = CommitConsumerArgs::new(0, 0);
898
899        let authority = ConsensusAuthority::start(
900            network_type,
901            0,
902            index,
903            committee,
904            parameters,
905            protocol_config,
906            protocol_keypair,
907            network_keypair,
908            Arc::new(Clock::default()),
909            Arc::new(txn_verifier),
910            commit_consumer,
911            registry,
912            boot_counter,
913        )
914        .await;
915
916        (authority, commit_receiver, block_receiver)
917    }
918}