1use std::{sync::Arc, time::Instant};
5
6use consensus_config::{AuthorityIndex, Committee, NetworkKeyPair, Parameters, ProtocolKeyPair};
7use itertools::Itertools;
8use mysten_metrics::spawn_logged_monitored_task;
9use parking_lot::RwLock;
10use prometheus::Registry;
11use sui_protocol_config::ProtocolConfig;
12use tokio::task::JoinHandle;
13use tracing::{info, warn};
14
15use crate::{
16 CommitConsumerArgs,
17 authority_service::AuthorityService,
18 block_manager::BlockManager,
19 block_verifier::SignedBlockVerifier,
20 commit_observer::CommitObserver,
21 commit_syncer::{CommitSyncer, CommitSyncerHandle},
22 commit_vote_monitor::CommitVoteMonitor,
23 context::{Clock, Context},
24 core::{Core, CoreSignals},
25 core_thread::{ChannelCoreThreadDispatcher, CoreThreadHandle},
26 dag_state::DagState,
27 leader_schedule::LeaderSchedule,
28 leader_timeout::{LeaderTimeoutTask, LeaderTimeoutTaskHandle},
29 metrics::initialise_metrics,
30 network::{NetworkManager, tonic_network::TonicManager},
31 proposed_block_handler::ProposedBlockHandler,
32 round_prober::{RoundProber, RoundProberHandle},
33 round_tracker::PeerRoundTracker,
34 storage::rocksdb_store::RocksDBStore,
35 subscriber::Subscriber,
36 synchronizer::{Synchronizer, SynchronizerHandle},
37 transaction::{TransactionClient, TransactionConsumer, TransactionVerifier},
38 transaction_certifier::TransactionCertifier,
39};
40
41#[allow(private_interfaces)]
44pub enum ConsensusAuthority {
45 WithTonic(AuthorityNode<TonicManager>),
46}
47
48impl ConsensusAuthority {
49 pub async fn start(
50 network_type: NetworkType,
51 epoch_start_timestamp_ms: u64,
52 own_index: AuthorityIndex,
53 committee: Committee,
54 parameters: Parameters,
55 protocol_config: ProtocolConfig,
56 protocol_keypair: ProtocolKeyPair,
57 network_keypair: NetworkKeyPair,
58 clock: Arc<Clock>,
59 transaction_verifier: Arc<dyn TransactionVerifier>,
60 commit_consumer: CommitConsumerArgs,
61 registry: Registry,
62 boot_counter: u64,
66 ) -> Self {
67 match network_type {
68 NetworkType::Tonic => {
69 let authority = AuthorityNode::start(
70 epoch_start_timestamp_ms,
71 own_index,
72 committee,
73 parameters,
74 protocol_config,
75 protocol_keypair,
76 network_keypair,
77 clock,
78 transaction_verifier,
79 commit_consumer,
80 registry,
81 boot_counter,
82 )
83 .await;
84 Self::WithTonic(authority)
85 }
86 }
87 }
88
89 pub async fn stop(self) {
90 match self {
91 Self::WithTonic(authority) => authority.stop().await,
92 }
93 }
94
95 pub fn transaction_client(&self) -> Arc<TransactionClient> {
96 match self {
97 Self::WithTonic(authority) => authority.transaction_client(),
98 }
99 }
100
101 #[cfg(test)]
102 fn context(&self) -> &Arc<Context> {
103 match self {
104 Self::WithTonic(authority) => &authority.context,
105 }
106 }
107}
108
109#[derive(Clone, Copy, PartialEq, Eq, Debug)]
110pub enum NetworkType {
111 Tonic,
112}
113
114pub(crate) struct AuthorityNode<N>
115where
116 N: NetworkManager<AuthorityService<ChannelCoreThreadDispatcher>>,
117{
118 context: Arc<Context>,
119 start_time: Instant,
120 transaction_client: Arc<TransactionClient>,
121 synchronizer: Arc<SynchronizerHandle>,
122
123 commit_syncer_handle: CommitSyncerHandle,
124 round_prober_handle: RoundProberHandle,
125 proposed_block_handler: JoinHandle<()>,
126 leader_timeout_handle: LeaderTimeoutTaskHandle,
127 core_thread_handle: CoreThreadHandle,
128 subscriber: Subscriber<N::Client, AuthorityService<ChannelCoreThreadDispatcher>>,
129 network_manager: N,
130}
131
132impl<N> AuthorityNode<N>
133where
134 N: NetworkManager<AuthorityService<ChannelCoreThreadDispatcher>>,
135{
136 pub(crate) async fn start(
138 epoch_start_timestamp_ms: u64,
139 own_index: AuthorityIndex,
140 committee: Committee,
141 parameters: Parameters,
142 protocol_config: ProtocolConfig,
143 protocol_keypair: ProtocolKeyPair,
144 network_keypair: NetworkKeyPair,
145 clock: Arc<Clock>,
146 transaction_verifier: Arc<dyn TransactionVerifier>,
147 commit_consumer: CommitConsumerArgs,
148 registry: Registry,
149 boot_counter: u64,
150 ) -> Self {
151 assert!(
152 committee.is_valid_index(own_index),
153 "Invalid own index {}",
154 own_index
155 );
156 let own_hostname = committee.authority(own_index).hostname.clone();
157 info!(
158 "Starting consensus authority {} {}, {:?}, epoch start timestamp {}, boot counter {}, replaying after commit index {}, consumer last processed commit index {}",
159 own_index,
160 own_hostname,
161 protocol_config.version,
162 epoch_start_timestamp_ms,
163 boot_counter,
164 commit_consumer.replay_after_commit_index,
165 commit_consumer.consumer_last_processed_commit_index
166 );
167 info!(
168 "Consensus authorities: {}",
169 committee
170 .authorities()
171 .map(|(i, a)| format!("{}: {}", i, a.hostname))
172 .join(", ")
173 );
174 info!("Consensus parameters: {:?}", parameters);
175 info!("Consensus committee: {:?}", committee);
176 let context = Arc::new(Context::new(
177 epoch_start_timestamp_ms,
178 own_index,
179 committee,
180 parameters,
181 protocol_config,
182 initialise_metrics(registry),
183 clock,
184 ));
185 let start_time = Instant::now();
186
187 context
188 .metrics
189 .node_metrics
190 .authority_index
191 .with_label_values(&[&own_hostname])
192 .set(context.own_index.value() as i64);
193 context
194 .metrics
195 .node_metrics
196 .protocol_version
197 .set(context.protocol_config.version.as_u64() as i64);
198
199 let (tx_client, tx_receiver) = TransactionClient::new(context.clone());
200 let tx_consumer = TransactionConsumer::new(tx_receiver, context.clone());
201
202 let (core_signals, signals_receivers) = CoreSignals::new(context.clone());
203
204 let mut network_manager = N::new(context.clone(), network_keypair);
205 let network_client = network_manager.client();
206
207 let store_path = context.parameters.db_path.as_path().to_str().unwrap();
208 let store = Arc::new(RocksDBStore::new(store_path));
209 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
210
211 let block_verifier = Arc::new(SignedBlockVerifier::new(
212 context.clone(),
213 transaction_verifier,
214 ));
215
216 let transaction_certifier = TransactionCertifier::new(
217 context.clone(),
218 block_verifier.clone(),
219 dag_state.clone(),
220 commit_consumer.block_sender.clone(),
221 );
222
223 let mut proposed_block_handler = ProposedBlockHandler::new(
224 context.clone(),
225 signals_receivers.block_broadcast_receiver(),
226 transaction_certifier.clone(),
227 );
228
229 let proposed_block_handler =
230 spawn_logged_monitored_task!(proposed_block_handler.run(), "proposed_block_handler");
231
232 let sync_last_known_own_block = boot_counter == 0
233 && !context
234 .parameters
235 .sync_last_known_own_block_timeout
236 .is_zero();
237 info!(
238 "Sync last known own block: {}. Boot count: {}. Timeout: {:?}.",
239 sync_last_known_own_block,
240 boot_counter,
241 context.parameters.sync_last_known_own_block_timeout
242 );
243
244 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
245
246 let leader_schedule = Arc::new(LeaderSchedule::from_store(
247 context.clone(),
248 dag_state.clone(),
249 ));
250
251 let commit_consumer_monitor = commit_consumer.monitor();
252 let commit_observer = CommitObserver::new(
253 context.clone(),
254 commit_consumer,
255 dag_state.clone(),
256 transaction_certifier.clone(),
257 leader_schedule.clone(),
258 )
259 .await;
260
261 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
262
263 let core = Core::new(
266 context.clone(),
267 leader_schedule,
268 tx_consumer,
269 transaction_certifier.clone(),
270 block_manager,
271 commit_observer,
272 core_signals,
273 protocol_keypair,
274 dag_state.clone(),
275 sync_last_known_own_block,
276 round_tracker.clone(),
277 );
278
279 let (core_dispatcher, core_thread_handle) =
280 ChannelCoreThreadDispatcher::start(context.clone(), &dag_state, core);
281 let core_dispatcher = Arc::new(core_dispatcher);
282 let leader_timeout_handle =
283 LeaderTimeoutTask::start(core_dispatcher.clone(), &signals_receivers, context.clone());
284
285 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
286
287 let synchronizer = Synchronizer::start(
288 network_client.clone(),
289 context.clone(),
290 core_dispatcher.clone(),
291 commit_vote_monitor.clone(),
292 block_verifier.clone(),
293 transaction_certifier.clone(),
294 dag_state.clone(),
295 sync_last_known_own_block,
296 );
297
298 let commit_syncer_handle = CommitSyncer::new(
299 context.clone(),
300 core_dispatcher.clone(),
301 commit_vote_monitor.clone(),
302 commit_consumer_monitor.clone(),
303 block_verifier.clone(),
304 transaction_certifier.clone(),
305 network_client.clone(),
306 dag_state.clone(),
307 )
308 .start();
309
310 let round_prober_handle = RoundProber::new(
311 context.clone(),
312 core_dispatcher.clone(),
313 round_tracker.clone(),
314 dag_state.clone(),
315 network_client.clone(),
316 )
317 .start();
318
319 let network_service = Arc::new(AuthorityService::new(
320 context.clone(),
321 block_verifier,
322 commit_vote_monitor,
323 round_tracker.clone(),
324 synchronizer.clone(),
325 core_dispatcher,
326 signals_receivers.block_broadcast_receiver(),
327 transaction_certifier,
328 dag_state.clone(),
329 store,
330 ));
331
332 let subscriber = {
333 let s = Subscriber::new(
334 context.clone(),
335 network_client,
336 network_service.clone(),
337 dag_state,
338 );
339 for (peer, _) in context.committee.authorities() {
340 if peer != context.own_index {
341 s.subscribe(peer);
342 }
343 }
344 s
345 };
346
347 network_manager.install_service(network_service).await;
348
349 info!(
350 "Consensus authority started, took {:?}",
351 start_time.elapsed()
352 );
353
354 Self {
355 context,
356 start_time,
357 transaction_client: Arc::new(tx_client),
358 synchronizer,
359 commit_syncer_handle,
360 round_prober_handle,
361 proposed_block_handler,
362 leader_timeout_handle,
363 core_thread_handle,
364 subscriber,
365 network_manager,
366 }
367 }
368
369 pub(crate) async fn stop(mut self) {
370 info!(
371 "Stopping authority. Total run time: {:?}",
372 self.start_time.elapsed()
373 );
374
375 if let Err(e) = self.synchronizer.stop().await {
377 if e.is_panic() {
378 std::panic::resume_unwind(e.into_panic());
379 }
380 warn!(
381 "Failed to stop synchronizer when shutting down consensus: {:?}",
382 e
383 );
384 };
385 self.commit_syncer_handle.stop().await;
386 self.round_prober_handle.stop().await;
387 self.proposed_block_handler.abort();
388 self.leader_timeout_handle.stop().await;
389 self.core_thread_handle.stop().await;
391 self.subscriber.stop();
393 self.network_manager.stop().await;
394
395 self.context
396 .metrics
397 .node_metrics
398 .uptime
399 .observe(self.start_time.elapsed().as_secs_f64());
400 }
401
402 pub(crate) fn transaction_client(&self) -> Arc<TransactionClient> {
403 self.transaction_client.clone()
404 }
405}
406
407#[cfg(test)]
408mod tests {
409 #![allow(non_snake_case)]
410
411 use std::{
412 collections::{BTreeMap, BTreeSet},
413 sync::Arc,
414 time::Duration,
415 };
416
417 use consensus_config::{Parameters, local_committee_and_keys};
418 use mysten_metrics::RegistryService;
419 use mysten_metrics::monitored_mpsc::UnboundedReceiver;
420 use prometheus::Registry;
421 use rstest::rstest;
422 use sui_protocol_config::ProtocolConfig;
423 use tempfile::TempDir;
424 use tokio::time::{sleep, timeout};
425 use typed_store::DBMetrics;
426
427 use super::*;
428 use crate::{
429 CommittedSubDag,
430 block::{BlockAPI as _, CertifiedBlocksOutput, GENESIS_ROUND},
431 transaction::NoopTransactionVerifier,
432 };
433
434 #[rstest]
435 #[tokio::test]
436 async fn test_authority_start_and_stop(
437 #[values(NetworkType::Tonic)] network_type: NetworkType,
438 ) {
439 let (committee, keypairs) = local_committee_and_keys(0, vec![1]);
440 let registry = Registry::new();
441
442 let temp_dir = TempDir::new().unwrap();
443 let parameters = Parameters {
444 db_path: temp_dir.keep(),
445 ..Default::default()
446 };
447 let txn_verifier = NoopTransactionVerifier {};
448
449 let own_index = committee.to_authority_index(0).unwrap();
450 let protocol_keypair = keypairs[own_index].1.clone();
451 let network_keypair = keypairs[own_index].0.clone();
452
453 let (commit_consumer, _, _) = CommitConsumerArgs::new(0, 0);
454
455 let authority = ConsensusAuthority::start(
456 network_type,
457 0,
458 own_index,
459 committee,
460 parameters,
461 ProtocolConfig::get_for_max_version_UNSAFE(),
462 protocol_keypair,
463 network_keypair,
464 Arc::new(Clock::default()),
465 Arc::new(txn_verifier),
466 commit_consumer,
467 registry,
468 0,
469 )
470 .await;
471
472 assert_eq!(authority.context().own_index, own_index);
473 assert_eq!(authority.context().committee.epoch(), 0);
474 assert_eq!(authority.context().committee.size(), 1);
475
476 authority.stop().await;
477 }
478
479 #[rstest]
481 #[tokio::test(flavor = "current_thread")]
482 async fn test_authority_committee(
483 #[values(NetworkType::Tonic)] network_type: NetworkType,
484 #[values(5, 10)] gc_depth: u32,
485 ) {
486 telemetry_subscribers::init_for_testing();
487 let db_registry = Registry::new();
488 DBMetrics::init(RegistryService::new(db_registry));
489
490 const NUM_OF_AUTHORITIES: usize = 4;
491 let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
492 let mut protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
493 protocol_config.set_consensus_gc_depth_for_testing(gc_depth);
494
495 let temp_dirs = (0..NUM_OF_AUTHORITIES)
496 .map(|_| TempDir::new().unwrap())
497 .collect::<Vec<_>>();
498
499 let mut commit_receivers = Vec::with_capacity(committee.size());
500 let mut block_receivers = Vec::with_capacity(committee.size());
501 let mut authorities = Vec::with_capacity(committee.size());
502 let mut boot_counters = [0; NUM_OF_AUTHORITIES];
503
504 for (index, _authority_info) in committee.authorities() {
505 let (authority, commit_receiver, block_receiver) = make_authority(
506 index,
507 &temp_dirs[index.value()],
508 committee.clone(),
509 keypairs.clone(),
510 network_type,
511 boot_counters[index],
512 protocol_config.clone(),
513 )
514 .await;
515 boot_counters[index] += 1;
516 commit_receivers.push(commit_receiver);
517 block_receivers.push(block_receiver);
518 authorities.push(authority);
519 }
520
521 const NUM_TRANSACTIONS: u8 = 15;
522 let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
523 for i in 0..NUM_TRANSACTIONS {
524 let txn = vec![i; 16];
525 submitted_transactions.insert(txn.clone());
526 authorities[i as usize % authorities.len()]
527 .transaction_client()
528 .submit(vec![txn])
529 .await
530 .unwrap();
531 }
532
533 for receiver in &mut commit_receivers {
534 let mut expected_transactions = submitted_transactions.clone();
535 loop {
536 let committed_subdag =
537 tokio::time::timeout(Duration::from_secs(1), receiver.recv())
538 .await
539 .unwrap()
540 .unwrap();
541 for b in committed_subdag.blocks {
542 for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
543 assert!(
544 expected_transactions.remove(&txn),
545 "Transaction not submitted or already seen: {:?}",
546 txn
547 );
548 }
549 }
550 assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
551 if expected_transactions.is_empty() {
552 break;
553 }
554 }
555 }
556
557 let index = committee.to_authority_index(1).unwrap();
559 authorities.remove(index.value()).stop().await;
560 sleep(Duration::from_secs(10)).await;
561
562 let (authority, commit_receiver, block_receiver) = make_authority(
564 index,
565 &temp_dirs[index.value()],
566 committee.clone(),
567 keypairs.clone(),
568 network_type,
569 boot_counters[index],
570 protocol_config.clone(),
571 )
572 .await;
573 boot_counters[index] += 1;
574 commit_receivers[index] = commit_receiver;
575 block_receivers[index] = block_receiver;
576 authorities.insert(index.value(), authority);
577 sleep(Duration::from_secs(10)).await;
578
579 for authority in authorities {
581 authority.stop().await;
582 }
583 }
584
585 #[rstest]
586 #[tokio::test(flavor = "current_thread")]
587 async fn test_small_committee(
588 #[values(NetworkType::Tonic)] network_type: NetworkType,
589 #[values(1, 2, 3)] num_authorities: usize,
590 ) {
591 telemetry_subscribers::init_for_testing();
592 let db_registry = Registry::new();
593 DBMetrics::init(RegistryService::new(db_registry));
594
595 let (committee, keypairs) = local_committee_and_keys(0, vec![1; num_authorities]);
596 let protocol_config: ProtocolConfig = ProtocolConfig::get_for_max_version_UNSAFE();
597
598 let temp_dirs = (0..num_authorities)
599 .map(|_| TempDir::new().unwrap())
600 .collect::<Vec<_>>();
601
602 let mut output_receivers = Vec::with_capacity(committee.size());
603 let mut authorities: Vec<ConsensusAuthority> = Vec::with_capacity(committee.size());
604 let mut boot_counters = vec![0; num_authorities];
605
606 for (index, _authority_info) in committee.authorities() {
607 let (authority, commit_receiver, _block_receiver) = make_authority(
608 index,
609 &temp_dirs[index.value()],
610 committee.clone(),
611 keypairs.clone(),
612 network_type,
613 boot_counters[index],
614 protocol_config.clone(),
615 )
616 .await;
617 boot_counters[index] += 1;
618 output_receivers.push(commit_receiver);
619 authorities.push(authority);
620 }
621
622 const NUM_TRANSACTIONS: u8 = 15;
623 let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
624 for i in 0..NUM_TRANSACTIONS {
625 let txn = vec![i; 16];
626 submitted_transactions.insert(txn.clone());
627 authorities[i as usize % authorities.len()]
628 .transaction_client()
629 .submit(vec![txn])
630 .await
631 .unwrap();
632 }
633
634 for receiver in &mut output_receivers {
635 let mut expected_transactions = submitted_transactions.clone();
636 loop {
637 let committed_subdag =
638 tokio::time::timeout(Duration::from_secs(1), receiver.recv())
639 .await
640 .unwrap()
641 .unwrap();
642 for b in committed_subdag.blocks {
643 for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
644 assert!(
645 expected_transactions.remove(&txn),
646 "Transaction not submitted or already seen: {:?}",
647 txn
648 );
649 }
650 }
651 assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
652 if expected_transactions.is_empty() {
653 break;
654 }
655 }
656 }
657
658 let index = committee.to_authority_index(0).unwrap();
660 authorities.remove(index.value()).stop().await;
661 sleep(Duration::from_secs(10)).await;
662
663 let (authority, commit_receiver, _block_receiver) = make_authority(
665 index,
666 &temp_dirs[index.value()],
667 committee.clone(),
668 keypairs.clone(),
669 network_type,
670 boot_counters[index],
671 protocol_config.clone(),
672 )
673 .await;
674 boot_counters[index] += 1;
675 output_receivers[index] = commit_receiver;
676 authorities.insert(index.value(), authority);
677 sleep(Duration::from_secs(10)).await;
678
679 for authority in authorities {
681 authority.stop().await;
682 }
683 }
684
685 #[rstest]
686 #[tokio::test(flavor = "current_thread")]
687 async fn test_amnesia_recovery_success(#[values(5, 10)] gc_depth: u32) {
688 telemetry_subscribers::init_for_testing();
689 let db_registry = Registry::new();
690 DBMetrics::init(RegistryService::new(db_registry));
691
692 const NUM_OF_AUTHORITIES: usize = 4;
693 let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
694 let mut commit_receivers = vec![];
695 let mut block_receivers = vec![];
696 let mut authorities = BTreeMap::new();
697 let mut temp_dirs = BTreeMap::new();
698 let mut boot_counters = [0; NUM_OF_AUTHORITIES];
699
700 let mut protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
701 protocol_config.set_consensus_gc_depth_for_testing(gc_depth);
702
703 for (index, _authority_info) in committee.authorities() {
704 let dir = TempDir::new().unwrap();
705 let (authority, commit_receiver, block_receiver) = make_authority(
706 index,
707 &dir,
708 committee.clone(),
709 keypairs.clone(),
710 NetworkType::Tonic,
711 boot_counters[index],
712 protocol_config.clone(),
713 )
714 .await;
715 boot_counters[index] += 1;
716 commit_receivers.push(commit_receiver);
717 block_receivers.push(block_receiver);
718 authorities.insert(index, authority);
719 temp_dirs.insert(index, dir);
720 }
721
722 let index_1 = committee.to_authority_index(1).unwrap();
726 'outer: while let Some(result) =
727 timeout(Duration::from_secs(10), commit_receivers[index_1].recv())
728 .await
729 .expect("Timed out while waiting for at least one committed block from authority 1")
730 {
731 for block in result.blocks {
732 if block.round() > GENESIS_ROUND && block.author() == index_1 {
733 break 'outer;
734 }
735 }
736 }
737
738 authorities.remove(&index_1).unwrap().stop().await;
744 let index_2 = committee.to_authority_index(2).unwrap();
745 authorities.remove(&index_2).unwrap().stop().await;
746 sleep(Duration::from_secs(5)).await;
747
748 let dir = TempDir::new().unwrap();
752 boot_counters[index_1] = 0;
754 let (authority, mut commit_receiver, _block_receiver) = make_authority(
755 index_1,
756 &dir,
757 committee.clone(),
758 keypairs.clone(),
759 NetworkType::Tonic,
760 boot_counters[index_1],
761 protocol_config.clone(),
762 )
763 .await;
764 boot_counters[index_1] += 1;
765 authorities.insert(index_1, authority);
766 temp_dirs.insert(index_1, dir);
767 sleep(Duration::from_secs(5)).await;
768
769 let (authority, _commit_receiver, _block_receiver) = make_authority(
772 index_2,
773 &temp_dirs[&index_2],
774 committee.clone(),
775 keypairs,
776 NetworkType::Tonic,
777 boot_counters[index_2],
778 protocol_config.clone(),
779 )
780 .await;
781 boot_counters[index_2] += 1;
782 authorities.insert(index_2, authority);
783 sleep(Duration::from_secs(5)).await;
784
785 'outer: while let Some(result) = commit_receiver.recv().await {
787 for block in result.blocks {
788 if block.round() > GENESIS_ROUND && block.author() == index_1 {
789 break 'outer;
790 }
791 }
792 }
793
794 for (_, authority) in authorities {
796 authority.stop().await;
797 }
798 }
799
800 async fn make_authority(
802 index: AuthorityIndex,
803 db_dir: &TempDir,
804 committee: Committee,
805 keypairs: Vec<(NetworkKeyPair, ProtocolKeyPair)>,
806 network_type: NetworkType,
807 boot_counter: u64,
808 protocol_config: ProtocolConfig,
809 ) -> (
810 ConsensusAuthority,
811 UnboundedReceiver<CommittedSubDag>,
812 UnboundedReceiver<CertifiedBlocksOutput>,
813 ) {
814 let registry = Registry::new();
815
816 let parameters = Parameters {
818 db_path: db_dir.path().to_path_buf(),
819 dag_state_cached_rounds: 5,
820 commit_sync_parallel_fetches: 2,
821 commit_sync_batch_size: 3,
822 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
823 ..Default::default()
824 };
825 let txn_verifier = NoopTransactionVerifier {};
826
827 let protocol_keypair = keypairs[index].1.clone();
828 let network_keypair = keypairs[index].0.clone();
829
830 let (commit_consumer, commit_receiver, block_receiver) = CommitConsumerArgs::new(0, 0);
831
832 let authority = ConsensusAuthority::start(
833 network_type,
834 0,
835 index,
836 committee,
837 parameters,
838 protocol_config,
839 protocol_keypair,
840 network_keypair,
841 Arc::new(Clock::default()),
842 Arc::new(txn_verifier),
843 commit_consumer,
844 registry,
845 boot_counter,
846 )
847 .await;
848
849 (authority, commit_receiver, block_receiver)
850 }
851}