1use std::{sync::Arc, time::Instant};
5
6use consensus_config::{AuthorityIndex, Committee, NetworkKeyPair, Parameters, ProtocolKeyPair};
7use itertools::Itertools;
8use mysten_metrics::spawn_logged_monitored_task;
9use parking_lot::RwLock;
10use prometheus::Registry;
11use sui_protocol_config::ProtocolConfig;
12use tokio::task::JoinHandle;
13use tracing::{info, warn};
14
15use crate::{
16 CommitConsumerArgs,
17 authority_service::AuthorityService,
18 block_manager::BlockManager,
19 block_verifier::SignedBlockVerifier,
20 commit_observer::CommitObserver,
21 commit_syncer::{CommitSyncer, CommitSyncerHandle},
22 commit_vote_monitor::CommitVoteMonitor,
23 context::{Clock, Context},
24 core::{Core, CoreSignals},
25 core_thread::{ChannelCoreThreadDispatcher, CoreThreadHandle},
26 dag_state::DagState,
27 leader_schedule::LeaderSchedule,
28 leader_timeout::{LeaderTimeoutTask, LeaderTimeoutTaskHandle},
29 metrics::initialise_metrics,
30 network::{NetworkManager, tonic_network::TonicManager},
31 proposed_block_handler::ProposedBlockHandler,
32 round_prober::{RoundProber, RoundProberHandle},
33 round_tracker::PeerRoundTracker,
34 storage::rocksdb_store::RocksDBStore,
35 subscriber::Subscriber,
36 synchronizer::{Synchronizer, SynchronizerHandle},
37 transaction::{TransactionClient, TransactionConsumer, TransactionVerifier},
38 transaction_certifier::TransactionCertifier,
39};
40
41#[allow(private_interfaces)]
44pub enum ConsensusAuthority {
45 WithTonic(AuthorityNode<TonicManager>),
46}
47
48impl ConsensusAuthority {
49 pub async fn start(
50 network_type: NetworkType,
51 epoch_start_timestamp_ms: u64,
52 own_index: AuthorityIndex,
53 committee: Committee,
54 parameters: Parameters,
55 protocol_config: ProtocolConfig,
56 protocol_keypair: ProtocolKeyPair,
57 network_keypair: NetworkKeyPair,
58 clock: Arc<Clock>,
59 transaction_verifier: Arc<dyn TransactionVerifier>,
60 commit_consumer: CommitConsumerArgs,
61 registry: Registry,
62 boot_counter: u64,
67 ) -> Self {
68 match network_type {
69 NetworkType::Tonic => {
70 let authority = AuthorityNode::start(
71 epoch_start_timestamp_ms,
72 own_index,
73 committee,
74 parameters,
75 protocol_config,
76 protocol_keypair,
77 network_keypair,
78 clock,
79 transaction_verifier,
80 commit_consumer,
81 registry,
82 boot_counter,
83 )
84 .await;
85 Self::WithTonic(authority)
86 }
87 }
88 }
89
90 pub async fn stop(self) {
91 match self {
92 Self::WithTonic(authority) => authority.stop().await,
93 }
94 }
95
96 pub fn transaction_client(&self) -> Arc<TransactionClient> {
97 match self {
98 Self::WithTonic(authority) => authority.transaction_client(),
99 }
100 }
101
102 #[cfg(test)]
103 fn context(&self) -> &Arc<Context> {
104 match self {
105 Self::WithTonic(authority) => &authority.context,
106 }
107 }
108}
109
110#[derive(Clone, Copy, PartialEq, Eq, Debug)]
111pub enum NetworkType {
112 Tonic,
113}
114
115pub(crate) struct AuthorityNode<N>
116where
117 N: NetworkManager<AuthorityService<ChannelCoreThreadDispatcher>>,
118{
119 context: Arc<Context>,
120 start_time: Instant,
121 transaction_client: Arc<TransactionClient>,
122 synchronizer: Arc<SynchronizerHandle>,
123
124 commit_syncer_handle: CommitSyncerHandle,
125 round_prober_handle: RoundProberHandle,
126 proposed_block_handler: JoinHandle<()>,
127 leader_timeout_handle: LeaderTimeoutTaskHandle,
128 core_thread_handle: CoreThreadHandle,
129 subscriber: Subscriber<N::Client, AuthorityService<ChannelCoreThreadDispatcher>>,
130 network_manager: N,
131}
132
133impl<N> AuthorityNode<N>
134where
135 N: NetworkManager<AuthorityService<ChannelCoreThreadDispatcher>>,
136{
137 pub(crate) async fn start(
138 epoch_start_timestamp_ms: u64,
139 own_index: AuthorityIndex,
140 committee: Committee,
141 parameters: Parameters,
142 protocol_config: ProtocolConfig,
143 protocol_keypair: ProtocolKeyPair,
146 network_keypair: NetworkKeyPair,
147 clock: Arc<Clock>,
148 transaction_verifier: Arc<dyn TransactionVerifier>,
149 commit_consumer: CommitConsumerArgs,
150 registry: Registry,
151 boot_counter: u64,
152 ) -> Self {
153 assert!(
154 committee.is_valid_index(own_index),
155 "Invalid own index {}",
156 own_index
157 );
158 let own_hostname = committee.authority(own_index).hostname.clone();
159 info!(
160 "Starting consensus authority {} {}, {:?}, epoch start timestamp {}, boot counter {}, replaying after commit index {}, consumer last processed commit index {}",
161 own_index,
162 own_hostname,
163 protocol_config.version,
164 epoch_start_timestamp_ms,
165 boot_counter,
166 commit_consumer.replay_after_commit_index,
167 commit_consumer.consumer_last_processed_commit_index
168 );
169 info!(
170 "Consensus authorities: {}",
171 committee
172 .authorities()
173 .map(|(i, a)| format!("{}: {}", i, a.hostname))
174 .join(", ")
175 );
176 info!("Consensus parameters: {:?}", parameters);
177 info!("Consensus committee: {:?}", committee);
178 let context = Arc::new(Context::new(
179 epoch_start_timestamp_ms,
180 own_index,
181 committee,
182 parameters,
183 protocol_config,
184 initialise_metrics(registry),
185 clock,
186 ));
187 let start_time = Instant::now();
188
189 context
190 .metrics
191 .node_metrics
192 .authority_index
193 .with_label_values(&[&own_hostname])
194 .set(context.own_index.value() as i64);
195 context
196 .metrics
197 .node_metrics
198 .protocol_version
199 .set(context.protocol_config.version.as_u64() as i64);
200
201 let (tx_client, tx_receiver) = TransactionClient::new(context.clone());
202 let tx_consumer = TransactionConsumer::new(tx_receiver, context.clone());
203
204 let (core_signals, signals_receivers) = CoreSignals::new(context.clone());
205
206 let mut network_manager = N::new(context.clone(), network_keypair);
207 let network_client = network_manager.client();
208
209 let store_path = context.parameters.db_path.as_path().to_str().unwrap();
210 let store = Arc::new(RocksDBStore::new(store_path));
211 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
212
213 let block_verifier = Arc::new(SignedBlockVerifier::new(
214 context.clone(),
215 transaction_verifier,
216 ));
217
218 let transaction_certifier = TransactionCertifier::new(
219 context.clone(),
220 block_verifier.clone(),
221 dag_state.clone(),
222 commit_consumer.block_sender.clone(),
223 );
224
225 let mut proposed_block_handler = ProposedBlockHandler::new(
226 context.clone(),
227 signals_receivers.block_broadcast_receiver(),
228 transaction_certifier.clone(),
229 );
230
231 let proposed_block_handler =
232 spawn_logged_monitored_task!(proposed_block_handler.run(), "proposed_block_handler");
233
234 let sync_last_known_own_block = boot_counter == 0
235 && dag_state.read().highest_accepted_round() == 0
236 && !context
237 .parameters
238 .sync_last_known_own_block_timeout
239 .is_zero();
240 info!("Sync last known own block: {sync_last_known_own_block}");
241
242 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
243
244 let leader_schedule = Arc::new(LeaderSchedule::from_store(
245 context.clone(),
246 dag_state.clone(),
247 ));
248
249 let commit_consumer_monitor = commit_consumer.monitor();
250 let commit_observer = CommitObserver::new(
251 context.clone(),
252 commit_consumer,
253 dag_state.clone(),
254 transaction_certifier.clone(),
255 leader_schedule.clone(),
256 )
257 .await;
258
259 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
260
261 let core = Core::new(
262 context.clone(),
263 leader_schedule,
264 tx_consumer,
265 transaction_certifier.clone(),
266 block_manager,
267 commit_observer,
268 core_signals,
269 protocol_keypair,
270 dag_state.clone(),
271 sync_last_known_own_block,
272 round_tracker.clone(),
273 );
274
275 let (core_dispatcher, core_thread_handle) =
276 ChannelCoreThreadDispatcher::start(context.clone(), &dag_state, core);
277 let core_dispatcher = Arc::new(core_dispatcher);
278 let leader_timeout_handle =
279 LeaderTimeoutTask::start(core_dispatcher.clone(), &signals_receivers, context.clone());
280
281 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
282
283 let synchronizer = Synchronizer::start(
284 network_client.clone(),
285 context.clone(),
286 core_dispatcher.clone(),
287 commit_vote_monitor.clone(),
288 block_verifier.clone(),
289 transaction_certifier.clone(),
290 dag_state.clone(),
291 sync_last_known_own_block,
292 );
293
294 let commit_syncer_handle = CommitSyncer::new(
295 context.clone(),
296 core_dispatcher.clone(),
297 commit_vote_monitor.clone(),
298 commit_consumer_monitor.clone(),
299 block_verifier.clone(),
300 transaction_certifier.clone(),
301 network_client.clone(),
302 dag_state.clone(),
303 )
304 .start();
305
306 let round_prober_handle = RoundProber::new(
307 context.clone(),
308 core_dispatcher.clone(),
309 round_tracker.clone(),
310 dag_state.clone(),
311 network_client.clone(),
312 )
313 .start();
314
315 let network_service = Arc::new(AuthorityService::new(
316 context.clone(),
317 block_verifier,
318 commit_vote_monitor,
319 round_tracker.clone(),
320 synchronizer.clone(),
321 core_dispatcher,
322 signals_receivers.block_broadcast_receiver(),
323 transaction_certifier,
324 dag_state.clone(),
325 store,
326 ));
327
328 let subscriber = {
329 let s = Subscriber::new(
330 context.clone(),
331 network_client,
332 network_service.clone(),
333 dag_state,
334 );
335 for (peer, _) in context.committee.authorities() {
336 if peer != context.own_index {
337 s.subscribe(peer);
338 }
339 }
340 s
341 };
342
343 network_manager.install_service(network_service).await;
344
345 info!(
346 "Consensus authority started, took {:?}",
347 start_time.elapsed()
348 );
349
350 Self {
351 context,
352 start_time,
353 transaction_client: Arc::new(tx_client),
354 synchronizer,
355 commit_syncer_handle,
356 round_prober_handle,
357 proposed_block_handler,
358 leader_timeout_handle,
359 core_thread_handle,
360 subscriber,
361 network_manager,
362 }
363 }
364
365 pub(crate) async fn stop(mut self) {
366 info!(
367 "Stopping authority. Total run time: {:?}",
368 self.start_time.elapsed()
369 );
370
371 if let Err(e) = self.synchronizer.stop().await {
373 if e.is_panic() {
374 std::panic::resume_unwind(e.into_panic());
375 }
376 warn!(
377 "Failed to stop synchronizer when shutting down consensus: {:?}",
378 e
379 );
380 };
381 self.commit_syncer_handle.stop().await;
382 self.round_prober_handle.stop().await;
383 self.proposed_block_handler.abort();
384 self.leader_timeout_handle.stop().await;
385 self.core_thread_handle.stop().await;
387 self.subscriber.stop();
389 self.network_manager.stop().await;
390
391 self.context
392 .metrics
393 .node_metrics
394 .uptime
395 .observe(self.start_time.elapsed().as_secs_f64());
396 }
397
398 pub(crate) fn transaction_client(&self) -> Arc<TransactionClient> {
399 self.transaction_client.clone()
400 }
401}
402
403#[cfg(test)]
404mod tests {
405 #![allow(non_snake_case)]
406
407 use std::{
408 collections::{BTreeMap, BTreeSet},
409 sync::Arc,
410 time::Duration,
411 };
412
413 use consensus_config::{Parameters, local_committee_and_keys};
414 use mysten_metrics::RegistryService;
415 use mysten_metrics::monitored_mpsc::UnboundedReceiver;
416 use prometheus::Registry;
417 use rstest::rstest;
418 use sui_protocol_config::ProtocolConfig;
419 use tempfile::TempDir;
420 use tokio::time::{sleep, timeout};
421 use typed_store::DBMetrics;
422
423 use super::*;
424 use crate::{
425 CommittedSubDag,
426 block::{BlockAPI as _, CertifiedBlocksOutput, GENESIS_ROUND},
427 transaction::NoopTransactionVerifier,
428 };
429
430 #[rstest]
431 #[tokio::test]
432 async fn test_authority_start_and_stop(
433 #[values(NetworkType::Tonic)] network_type: NetworkType,
434 ) {
435 let (committee, keypairs) = local_committee_and_keys(0, vec![1]);
436 let registry = Registry::new();
437
438 let temp_dir = TempDir::new().unwrap();
439 let parameters = Parameters {
440 db_path: temp_dir.keep(),
441 ..Default::default()
442 };
443 let txn_verifier = NoopTransactionVerifier {};
444
445 let own_index = committee.to_authority_index(0).unwrap();
446 let protocol_keypair = keypairs[own_index].1.clone();
447 let network_keypair = keypairs[own_index].0.clone();
448
449 let (commit_consumer, _, _) = CommitConsumerArgs::new(0, 0);
450
451 let authority = ConsensusAuthority::start(
452 network_type,
453 0,
454 own_index,
455 committee,
456 parameters,
457 ProtocolConfig::get_for_max_version_UNSAFE(),
458 protocol_keypair,
459 network_keypair,
460 Arc::new(Clock::default()),
461 Arc::new(txn_verifier),
462 commit_consumer,
463 registry,
464 0,
465 )
466 .await;
467
468 assert_eq!(authority.context().own_index, own_index);
469 assert_eq!(authority.context().committee.epoch(), 0);
470 assert_eq!(authority.context().committee.size(), 1);
471
472 authority.stop().await;
473 }
474
475 #[rstest]
477 #[tokio::test(flavor = "current_thread")]
478 async fn test_authority_committee(
479 #[values(NetworkType::Tonic)] network_type: NetworkType,
480 #[values(5, 10)] gc_depth: u32,
481 ) {
482 telemetry_subscribers::init_for_testing();
483 let db_registry = Registry::new();
484 DBMetrics::init(RegistryService::new(db_registry));
485
486 const NUM_OF_AUTHORITIES: usize = 4;
487 let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
488 let mut protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
489 protocol_config.set_consensus_gc_depth_for_testing(gc_depth);
490
491 let temp_dirs = (0..NUM_OF_AUTHORITIES)
492 .map(|_| TempDir::new().unwrap())
493 .collect::<Vec<_>>();
494
495 let mut commit_receivers = Vec::with_capacity(committee.size());
496 let mut block_receivers = Vec::with_capacity(committee.size());
497 let mut authorities = Vec::with_capacity(committee.size());
498 let mut boot_counters = [0; NUM_OF_AUTHORITIES];
499
500 for (index, _authority_info) in committee.authorities() {
501 let (authority, commit_receiver, block_receiver) = make_authority(
502 index,
503 &temp_dirs[index.value()],
504 committee.clone(),
505 keypairs.clone(),
506 network_type,
507 boot_counters[index],
508 protocol_config.clone(),
509 )
510 .await;
511 boot_counters[index] += 1;
512 commit_receivers.push(commit_receiver);
513 block_receivers.push(block_receiver);
514 authorities.push(authority);
515 }
516
517 const NUM_TRANSACTIONS: u8 = 15;
518 let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
519 for i in 0..NUM_TRANSACTIONS {
520 let txn = vec![i; 16];
521 submitted_transactions.insert(txn.clone());
522 authorities[i as usize % authorities.len()]
523 .transaction_client()
524 .submit(vec![txn])
525 .await
526 .unwrap();
527 }
528
529 for receiver in &mut commit_receivers {
530 let mut expected_transactions = submitted_transactions.clone();
531 loop {
532 let committed_subdag =
533 tokio::time::timeout(Duration::from_secs(1), receiver.recv())
534 .await
535 .unwrap()
536 .unwrap();
537 for b in committed_subdag.blocks {
538 for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
539 assert!(
540 expected_transactions.remove(&txn),
541 "Transaction not submitted or already seen: {:?}",
542 txn
543 );
544 }
545 }
546 assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
547 if expected_transactions.is_empty() {
548 break;
549 }
550 }
551 }
552
553 let index = committee.to_authority_index(1).unwrap();
555 authorities.remove(index.value()).stop().await;
556 sleep(Duration::from_secs(10)).await;
557
558 let (authority, commit_receiver, block_receiver) = make_authority(
560 index,
561 &temp_dirs[index.value()],
562 committee.clone(),
563 keypairs.clone(),
564 network_type,
565 boot_counters[index],
566 protocol_config.clone(),
567 )
568 .await;
569 boot_counters[index] += 1;
570 commit_receivers[index] = commit_receiver;
571 block_receivers[index] = block_receiver;
572 authorities.insert(index.value(), authority);
573 sleep(Duration::from_secs(10)).await;
574
575 for authority in authorities {
577 authority.stop().await;
578 }
579 }
580
581 #[rstest]
582 #[tokio::test(flavor = "current_thread")]
583 async fn test_small_committee(
584 #[values(NetworkType::Tonic)] network_type: NetworkType,
585 #[values(1, 2, 3)] num_authorities: usize,
586 ) {
587 telemetry_subscribers::init_for_testing();
588 let db_registry = Registry::new();
589 DBMetrics::init(RegistryService::new(db_registry));
590
591 let (committee, keypairs) = local_committee_and_keys(0, vec![1; num_authorities]);
592 let protocol_config: ProtocolConfig = ProtocolConfig::get_for_max_version_UNSAFE();
593
594 let temp_dirs = (0..num_authorities)
595 .map(|_| TempDir::new().unwrap())
596 .collect::<Vec<_>>();
597
598 let mut output_receivers = Vec::with_capacity(committee.size());
599 let mut authorities: Vec<ConsensusAuthority> = Vec::with_capacity(committee.size());
600 let mut boot_counters = vec![0; num_authorities];
601
602 for (index, _authority_info) in committee.authorities() {
603 let (authority, commit_receiver, _block_receiver) = make_authority(
604 index,
605 &temp_dirs[index.value()],
606 committee.clone(),
607 keypairs.clone(),
608 network_type,
609 boot_counters[index],
610 protocol_config.clone(),
611 )
612 .await;
613 boot_counters[index] += 1;
614 output_receivers.push(commit_receiver);
615 authorities.push(authority);
616 }
617
618 const NUM_TRANSACTIONS: u8 = 15;
619 let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
620 for i in 0..NUM_TRANSACTIONS {
621 let txn = vec![i; 16];
622 submitted_transactions.insert(txn.clone());
623 authorities[i as usize % authorities.len()]
624 .transaction_client()
625 .submit(vec![txn])
626 .await
627 .unwrap();
628 }
629
630 for receiver in &mut output_receivers {
631 let mut expected_transactions = submitted_transactions.clone();
632 loop {
633 let committed_subdag =
634 tokio::time::timeout(Duration::from_secs(1), receiver.recv())
635 .await
636 .unwrap()
637 .unwrap();
638 for b in committed_subdag.blocks {
639 for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
640 assert!(
641 expected_transactions.remove(&txn),
642 "Transaction not submitted or already seen: {:?}",
643 txn
644 );
645 }
646 }
647 assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
648 if expected_transactions.is_empty() {
649 break;
650 }
651 }
652 }
653
654 let index = committee.to_authority_index(0).unwrap();
656 authorities.remove(index.value()).stop().await;
657 sleep(Duration::from_secs(10)).await;
658
659 let (authority, commit_receiver, _block_receiver) = make_authority(
661 index,
662 &temp_dirs[index.value()],
663 committee.clone(),
664 keypairs.clone(),
665 network_type,
666 boot_counters[index],
667 protocol_config.clone(),
668 )
669 .await;
670 boot_counters[index] += 1;
671 output_receivers[index] = commit_receiver;
672 authorities.insert(index.value(), authority);
673 sleep(Duration::from_secs(10)).await;
674
675 for authority in authorities {
677 authority.stop().await;
678 }
679 }
680
681 #[rstest]
682 #[tokio::test(flavor = "current_thread")]
683 async fn test_amnesia_recovery_success(#[values(5, 10)] gc_depth: u32) {
684 telemetry_subscribers::init_for_testing();
685 let db_registry = Registry::new();
686 DBMetrics::init(RegistryService::new(db_registry));
687
688 const NUM_OF_AUTHORITIES: usize = 4;
689 let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
690 let mut commit_receivers = vec![];
691 let mut block_receivers = vec![];
692 let mut authorities = BTreeMap::new();
693 let mut temp_dirs = BTreeMap::new();
694 let mut boot_counters = [0; NUM_OF_AUTHORITIES];
695
696 let mut protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
697 protocol_config.set_consensus_gc_depth_for_testing(gc_depth);
698
699 for (index, _authority_info) in committee.authorities() {
700 let dir = TempDir::new().unwrap();
701 let (authority, commit_receiver, block_receiver) = make_authority(
702 index,
703 &dir,
704 committee.clone(),
705 keypairs.clone(),
706 NetworkType::Tonic,
707 boot_counters[index],
708 protocol_config.clone(),
709 )
710 .await;
711 boot_counters[index] += 1;
712 commit_receivers.push(commit_receiver);
713 block_receivers.push(block_receiver);
714 authorities.insert(index, authority);
715 temp_dirs.insert(index, dir);
716 }
717
718 let index_1 = committee.to_authority_index(1).unwrap();
722 'outer: while let Some(result) =
723 timeout(Duration::from_secs(10), commit_receivers[index_1].recv())
724 .await
725 .expect("Timed out while waiting for at least one committed block from authority 1")
726 {
727 for block in result.blocks {
728 if block.round() > GENESIS_ROUND && block.author() == index_1 {
729 break 'outer;
730 }
731 }
732 }
733
734 authorities.remove(&index_1).unwrap().stop().await;
740 let index_2 = committee.to_authority_index(2).unwrap();
741 authorities.remove(&index_2).unwrap().stop().await;
742 sleep(Duration::from_secs(5)).await;
743
744 let dir = TempDir::new().unwrap();
748 boot_counters[index_1] = 0;
750 let (authority, mut commit_receiver, _block_receiver) = make_authority(
751 index_1,
752 &dir,
753 committee.clone(),
754 keypairs.clone(),
755 NetworkType::Tonic,
756 boot_counters[index_1],
757 protocol_config.clone(),
758 )
759 .await;
760 boot_counters[index_1] += 1;
761 authorities.insert(index_1, authority);
762 temp_dirs.insert(index_1, dir);
763 sleep(Duration::from_secs(5)).await;
764
765 let (authority, _commit_receiver, _block_receiver) = make_authority(
768 index_2,
769 &temp_dirs[&index_2],
770 committee.clone(),
771 keypairs,
772 NetworkType::Tonic,
773 boot_counters[index_2],
774 protocol_config.clone(),
775 )
776 .await;
777 boot_counters[index_2] += 1;
778 authorities.insert(index_2, authority);
779 sleep(Duration::from_secs(5)).await;
780
781 'outer: while let Some(result) = commit_receiver.recv().await {
783 for block in result.blocks {
784 if block.round() > GENESIS_ROUND && block.author() == index_1 {
785 break 'outer;
786 }
787 }
788 }
789
790 for (_, authority) in authorities {
792 authority.stop().await;
793 }
794 }
795
796 async fn make_authority(
798 index: AuthorityIndex,
799 db_dir: &TempDir,
800 committee: Committee,
801 keypairs: Vec<(NetworkKeyPair, ProtocolKeyPair)>,
802 network_type: NetworkType,
803 boot_counter: u64,
804 protocol_config: ProtocolConfig,
805 ) -> (
806 ConsensusAuthority,
807 UnboundedReceiver<CommittedSubDag>,
808 UnboundedReceiver<CertifiedBlocksOutput>,
809 ) {
810 let registry = Registry::new();
811
812 let parameters = Parameters {
814 db_path: db_dir.path().to_path_buf(),
815 dag_state_cached_rounds: 5,
816 commit_sync_parallel_fetches: 2,
817 commit_sync_batch_size: 3,
818 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
819 ..Default::default()
820 };
821 let txn_verifier = NoopTransactionVerifier {};
822
823 let protocol_keypair = keypairs[index].1.clone();
824 let network_keypair = keypairs[index].0.clone();
825
826 let (commit_consumer, commit_receiver, block_receiver) = CommitConsumerArgs::new(0, 0);
827
828 let authority = ConsensusAuthority::start(
829 network_type,
830 0,
831 index,
832 committee,
833 parameters,
834 protocol_config,
835 protocol_keypair,
836 network_keypair,
837 Arc::new(Clock::default()),
838 Arc::new(txn_verifier),
839 commit_consumer,
840 registry,
841 boot_counter,
842 )
843 .await;
844
845 (authority, commit_receiver, block_receiver)
846 }
847}