1use std::{sync::Arc, time::Instant};
5
6use consensus_config::{
7 AuthorityIndex, Committee, NetworkKeyPair, NetworkPublicKey, Parameters, ProtocolKeyPair,
8};
9use consensus_types::block::Round;
10use itertools::Itertools;
11use mysten_metrics::spawn_logged_monitored_task;
12use mysten_network::Multiaddr;
13use parking_lot::RwLock;
14use prometheus::Registry;
15use sui_protocol_config::ProtocolConfig;
16use tokio::task::JoinHandle;
17use tracing::{info, warn};
18
19use crate::{
20 BlockAPI as _, CommitConsumerArgs,
21 authority_service::AuthorityService,
22 block_manager::BlockManager,
23 block_verifier::SignedBlockVerifier,
24 commit_observer::CommitObserver,
25 commit_syncer::{CommitSyncer, CommitSyncerHandle},
26 commit_vote_monitor::CommitVoteMonitor,
27 context::{Clock, Context},
28 core::{Core, CoreSignals},
29 core_thread::{ChannelCoreThreadDispatcher, CoreThreadHandle},
30 dag_state::DagState,
31 leader_schedule::LeaderSchedule,
32 leader_timeout::{LeaderTimeoutTask, LeaderTimeoutTaskHandle},
33 metrics::initialise_metrics,
34 network::{NetworkManager, tonic_network::TonicManager},
35 proposed_block_handler::ProposedBlockHandler,
36 round_prober::{RoundProber, RoundProberHandle},
37 round_tracker::RoundTracker,
38 storage::rocksdb_store::RocksDBStore,
39 subscriber::Subscriber,
40 synchronizer::{Synchronizer, SynchronizerHandle},
41 transaction::{TransactionClient, TransactionConsumer, TransactionVerifier},
42 transaction_certifier::TransactionCertifier,
43};
44
45#[allow(private_interfaces)]
48pub enum ConsensusAuthority {
49 WithTonic(AuthorityNode<TonicManager>),
50}
51
52impl ConsensusAuthority {
53 pub async fn start(
54 network_type: NetworkType,
55 epoch_start_timestamp_ms: u64,
56 own_index: AuthorityIndex,
57 committee: Committee,
58 parameters: Parameters,
59 protocol_config: ProtocolConfig,
60 protocol_keypair: ProtocolKeyPair,
61 network_keypair: NetworkKeyPair,
62 clock: Arc<Clock>,
63 transaction_verifier: Arc<dyn TransactionVerifier>,
64 commit_consumer: CommitConsumerArgs,
65 registry: Registry,
66 boot_counter: u64,
70 ) -> Self {
71 match network_type {
72 NetworkType::Tonic => {
73 let authority = AuthorityNode::start(
74 epoch_start_timestamp_ms,
75 own_index,
76 committee,
77 parameters,
78 protocol_config,
79 protocol_keypair,
80 network_keypair,
81 clock,
82 transaction_verifier,
83 commit_consumer,
84 registry,
85 boot_counter,
86 )
87 .await;
88 Self::WithTonic(authority)
89 }
90 }
91 }
92
93 pub async fn stop(self) {
94 match self {
95 Self::WithTonic(authority) => authority.stop().await,
96 }
97 }
98
99 pub fn update_peer_address(
100 &self,
101 network_pubkey: NetworkPublicKey,
102 address: Option<Multiaddr>,
103 ) {
104 match self {
105 Self::WithTonic(authority) => authority.update_peer_address(network_pubkey, address),
106 }
107 }
108
109 pub fn transaction_client(&self) -> Arc<TransactionClient> {
110 match self {
111 Self::WithTonic(authority) => authority.transaction_client(),
112 }
113 }
114
115 #[cfg(test)]
116 fn context(&self) -> &Arc<Context> {
117 match self {
118 Self::WithTonic(authority) => &authority.context,
119 }
120 }
121}
122
123#[derive(Clone, Copy, PartialEq, Eq, Debug)]
124pub enum NetworkType {
125 Tonic,
126}
127
128pub(crate) struct AuthorityNode<N>
129where
130 N: NetworkManager<AuthorityService<ChannelCoreThreadDispatcher>>,
131{
132 context: Arc<Context>,
133 start_time: Instant,
134 transaction_client: Arc<TransactionClient>,
135 synchronizer: Arc<SynchronizerHandle>,
136
137 commit_syncer_handle: CommitSyncerHandle,
138 round_prober_handle: RoundProberHandle,
139 proposed_block_handler: JoinHandle<()>,
140 leader_timeout_handle: LeaderTimeoutTaskHandle,
141 core_thread_handle: CoreThreadHandle,
142 subscriber: Subscriber<N::Client, AuthorityService<ChannelCoreThreadDispatcher>>,
143 network_manager: N,
144}
145
146impl<N> AuthorityNode<N>
147where
148 N: NetworkManager<AuthorityService<ChannelCoreThreadDispatcher>>,
149{
150 pub(crate) async fn start(
152 epoch_start_timestamp_ms: u64,
153 own_index: AuthorityIndex,
154 committee: Committee,
155 parameters: Parameters,
156 protocol_config: ProtocolConfig,
157 protocol_keypair: ProtocolKeyPair,
158 network_keypair: NetworkKeyPair,
159 clock: Arc<Clock>,
160 transaction_verifier: Arc<dyn TransactionVerifier>,
161 commit_consumer: CommitConsumerArgs,
162 registry: Registry,
163 boot_counter: u64,
164 ) -> Self {
165 assert!(
166 committee.is_valid_index(own_index),
167 "Invalid own index {}",
168 own_index
169 );
170 let own_hostname = committee.authority(own_index).hostname.clone();
171 info!(
172 "Starting consensus authority {} {}, {:?}, epoch start timestamp {}, boot counter {}, replaying after commit index {}, consumer last processed commit index {}",
173 own_index,
174 own_hostname,
175 protocol_config.version,
176 epoch_start_timestamp_ms,
177 boot_counter,
178 commit_consumer.replay_after_commit_index,
179 commit_consumer.consumer_last_processed_commit_index
180 );
181 info!(
182 "Consensus authorities: {}",
183 committee
184 .authorities()
185 .map(|(i, a)| format!("{}: {}", i, a.hostname))
186 .join(", ")
187 );
188 info!("Consensus parameters: {:?}", parameters);
189 info!("Consensus committee: {:?}", committee);
190 let context = Arc::new(Context::new(
191 epoch_start_timestamp_ms,
192 own_index,
193 committee,
194 parameters,
195 protocol_config,
196 initialise_metrics(registry),
197 clock,
198 ));
199 let start_time = Instant::now();
200
201 context
202 .metrics
203 .node_metrics
204 .authority_index
205 .with_label_values(&[&own_hostname])
206 .set(context.own_index.value() as i64);
207 context
208 .metrics
209 .node_metrics
210 .protocol_version
211 .set(context.protocol_config.version.as_u64() as i64);
212
213 let (tx_client, tx_receiver) = TransactionClient::new(context.clone());
214 let tx_consumer = TransactionConsumer::new(tx_receiver, context.clone());
215
216 let (core_signals, signals_receivers) = CoreSignals::new(context.clone());
217
218 let mut network_manager = N::new(context.clone(), network_keypair);
219 let network_client = network_manager.client();
220
221 let store_path = context.parameters.db_path.as_path().to_str().unwrap();
222 let store = Arc::new(RocksDBStore::new(store_path));
223 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
224
225 let block_verifier = Arc::new(SignedBlockVerifier::new(
226 context.clone(),
227 transaction_verifier,
228 ));
229
230 let transaction_certifier = TransactionCertifier::new(
231 context.clone(),
232 block_verifier.clone(),
233 dag_state.clone(),
234 commit_consumer.block_sender.clone(),
235 );
236
237 let mut proposed_block_handler = ProposedBlockHandler::new(
238 context.clone(),
239 signals_receivers.block_broadcast_receiver(),
240 transaction_certifier.clone(),
241 );
242
243 let proposed_block_handler =
244 spawn_logged_monitored_task!(proposed_block_handler.run(), "proposed_block_handler");
245
246 let sync_last_known_own_block = boot_counter == 0
247 && !context
248 .parameters
249 .sync_last_known_own_block_timeout
250 .is_zero();
251 info!(
252 "Sync last known own block: {}. Boot count: {}. Timeout: {:?}.",
253 sync_last_known_own_block,
254 boot_counter,
255 context.parameters.sync_last_known_own_block_timeout
256 );
257
258 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
259
260 let leader_schedule = Arc::new(LeaderSchedule::from_store(
261 context.clone(),
262 dag_state.clone(),
263 ));
264
265 let commit_consumer_monitor = commit_consumer.monitor();
266 let commit_observer = CommitObserver::new(
267 context.clone(),
268 commit_consumer,
269 dag_state.clone(),
270 transaction_certifier.clone(),
271 leader_schedule.clone(),
272 )
273 .await;
274
275 let initial_received_rounds = dag_state
276 .read()
277 .get_last_cached_block_per_authority(Round::MAX)
278 .into_iter()
279 .map(|(block, _)| block.round())
280 .collect::<Vec<_>>();
281 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(
282 context.clone(),
283 initial_received_rounds,
284 )));
285
286 let core = Core::new(
289 context.clone(),
290 leader_schedule,
291 tx_consumer,
292 transaction_certifier.clone(),
293 block_manager,
294 commit_observer,
295 core_signals,
296 protocol_keypair,
297 dag_state.clone(),
298 sync_last_known_own_block,
299 round_tracker.clone(),
300 );
301
302 let (core_dispatcher, core_thread_handle) =
303 ChannelCoreThreadDispatcher::start(context.clone(), &dag_state, core);
304 let core_dispatcher = Arc::new(core_dispatcher);
305 let leader_timeout_handle =
306 LeaderTimeoutTask::start(core_dispatcher.clone(), &signals_receivers, context.clone());
307
308 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
309
310 let synchronizer = Synchronizer::start(
311 network_client.clone(),
312 context.clone(),
313 core_dispatcher.clone(),
314 commit_vote_monitor.clone(),
315 block_verifier.clone(),
316 transaction_certifier.clone(),
317 round_tracker.clone(),
318 dag_state.clone(),
319 sync_last_known_own_block,
320 );
321
322 let commit_syncer_handle = CommitSyncer::new(
323 context.clone(),
324 core_dispatcher.clone(),
325 commit_vote_monitor.clone(),
326 commit_consumer_monitor.clone(),
327 block_verifier.clone(),
328 transaction_certifier.clone(),
329 round_tracker.clone(),
330 network_client.clone(),
331 dag_state.clone(),
332 )
333 .start();
334
335 let round_prober_handle = RoundProber::new(
336 context.clone(),
337 core_dispatcher.clone(),
338 round_tracker.clone(),
339 dag_state.clone(),
340 network_client.clone(),
341 )
342 .start();
343
344 let network_service = Arc::new(AuthorityService::new(
345 context.clone(),
346 block_verifier,
347 commit_vote_monitor,
348 round_tracker.clone(),
349 synchronizer.clone(),
350 core_dispatcher,
351 signals_receivers.block_broadcast_receiver(),
352 transaction_certifier,
353 dag_state.clone(),
354 store,
355 ));
356
357 let subscriber = {
358 let s = Subscriber::new(
359 context.clone(),
360 network_client,
361 network_service.clone(),
362 dag_state,
363 );
364 for (peer, _) in context.committee.authorities() {
365 if peer != context.own_index {
366 s.subscribe(peer);
367 }
368 }
369 s
370 };
371
372 network_manager.install_service(network_service).await;
373
374 info!(
375 "Consensus authority started, took {:?}",
376 start_time.elapsed()
377 );
378
379 Self {
380 context,
381 start_time,
382 transaction_client: Arc::new(tx_client),
383 synchronizer,
384 commit_syncer_handle,
385 round_prober_handle,
386 proposed_block_handler,
387 leader_timeout_handle,
388 core_thread_handle,
389 subscriber,
390 network_manager,
391 }
392 }
393
394 pub(crate) async fn stop(mut self) {
395 info!(
396 "Stopping authority. Total run time: {:?}",
397 self.start_time.elapsed()
398 );
399
400 if let Err(e) = self.synchronizer.stop().await {
402 if e.is_panic() {
403 std::panic::resume_unwind(e.into_panic());
404 }
405 warn!(
406 "Failed to stop synchronizer when shutting down consensus: {:?}",
407 e
408 );
409 };
410 self.commit_syncer_handle.stop().await;
411 self.round_prober_handle.stop().await;
412 self.proposed_block_handler.abort();
413 self.leader_timeout_handle.stop().await;
414 self.core_thread_handle.stop().await;
416 self.subscriber.stop();
418 self.network_manager.stop().await;
419
420 self.context
421 .metrics
422 .node_metrics
423 .uptime
424 .observe(self.start_time.elapsed().as_secs_f64());
425 }
426
427 pub(crate) fn transaction_client(&self) -> Arc<TransactionClient> {
428 self.transaction_client.clone()
429 }
430
431 pub(crate) fn update_peer_address(
432 &self,
433 network_pubkey: NetworkPublicKey,
434 address: Option<Multiaddr>,
435 ) {
436 let Some(peer) = self
438 .context
439 .committee
440 .authorities()
441 .find(|(_, authority)| authority.network_key == network_pubkey)
442 .map(|(index, _)| index)
443 else {
444 warn!(
445 "Network public key {:?} not found in committee, ignoring address update",
446 network_pubkey
447 );
448 return;
449 };
450
451 self.network_manager.update_peer_address(peer, address);
453
454 if peer != self.context.own_index {
456 info!("Re-subscribing to peer {} after address update", peer);
457 self.subscriber.subscribe(peer);
458 }
459 }
460}
461
462#[cfg(test)]
463mod tests {
464 #![allow(non_snake_case)]
465
466 use std::{
467 collections::{BTreeMap, BTreeSet},
468 sync::Arc,
469 time::Duration,
470 };
471
472 use consensus_config::{Parameters, local_committee_and_keys};
473 use mysten_metrics::RegistryService;
474 use mysten_metrics::monitored_mpsc::UnboundedReceiver;
475 use prometheus::Registry;
476 use rstest::rstest;
477 use sui_protocol_config::ProtocolConfig;
478 use tempfile::TempDir;
479 use tokio::time::{sleep, timeout};
480 use typed_store::DBMetrics;
481
482 use super::*;
483 use crate::{
484 CommittedSubDag,
485 block::{BlockAPI as _, GENESIS_ROUND},
486 transaction::NoopTransactionVerifier,
487 };
488
489 #[rstest]
490 #[tokio::test]
491 async fn test_authority_start_and_stop(
492 #[values(NetworkType::Tonic)] network_type: NetworkType,
493 ) {
494 let (committee, keypairs) = local_committee_and_keys(0, vec![1]);
495 let registry = Registry::new();
496
497 let temp_dir = TempDir::new().unwrap();
498 let parameters = Parameters {
499 db_path: temp_dir.keep(),
500 ..Default::default()
501 };
502 let txn_verifier = NoopTransactionVerifier {};
503
504 let own_index = committee.to_authority_index(0).unwrap();
505 let protocol_keypair = keypairs[own_index].1.clone();
506 let network_keypair = keypairs[own_index].0.clone();
507
508 let (commit_consumer, _) = CommitConsumerArgs::new(0, 0);
509
510 let authority = ConsensusAuthority::start(
511 network_type,
512 0,
513 own_index,
514 committee,
515 parameters,
516 ProtocolConfig::get_for_max_version_UNSAFE(),
517 protocol_keypair,
518 network_keypair,
519 Arc::new(Clock::default()),
520 Arc::new(txn_verifier),
521 commit_consumer,
522 registry,
523 0,
524 )
525 .await;
526
527 assert_eq!(authority.context().own_index, own_index);
528 assert_eq!(authority.context().committee.epoch(), 0);
529 assert_eq!(authority.context().committee.size(), 1);
530
531 authority.stop().await;
532 }
533
534 #[rstest]
536 #[tokio::test(flavor = "current_thread")]
537 async fn test_authority_committee(
538 #[values(NetworkType::Tonic)] network_type: NetworkType,
539 #[values(5, 10)] gc_depth: u32,
540 ) {
541 telemetry_subscribers::init_for_testing();
542 let db_registry = Registry::new();
543 DBMetrics::init(RegistryService::new(db_registry));
544
545 const NUM_OF_AUTHORITIES: usize = 4;
546 let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
547 let mut protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
548 protocol_config.set_consensus_gc_depth_for_testing(gc_depth);
549
550 let temp_dirs = (0..NUM_OF_AUTHORITIES)
551 .map(|_| TempDir::new().unwrap())
552 .collect::<Vec<_>>();
553
554 let mut commit_receivers = Vec::with_capacity(committee.size());
555 let mut authorities = Vec::with_capacity(committee.size());
556 let mut boot_counters = [0; NUM_OF_AUTHORITIES];
557
558 for (index, _authority_info) in committee.authorities() {
559 let (authority, commit_receiver) = make_authority(
560 index,
561 &temp_dirs[index.value()],
562 committee.clone(),
563 keypairs.clone(),
564 network_type,
565 boot_counters[index],
566 protocol_config.clone(),
567 )
568 .await;
569 boot_counters[index] += 1;
570 commit_receivers.push(commit_receiver);
571 authorities.push(authority);
572 }
573
574 const NUM_TRANSACTIONS: u8 = 15;
575 let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
576 for i in 0..NUM_TRANSACTIONS {
577 let txn = vec![i; 16];
578 submitted_transactions.insert(txn.clone());
579 authorities[i as usize % authorities.len()]
580 .transaction_client()
581 .submit(vec![txn])
582 .await
583 .unwrap();
584 }
585
586 for receiver in &mut commit_receivers {
587 let mut expected_transactions = submitted_transactions.clone();
588 loop {
589 let committed_subdag =
590 tokio::time::timeout(Duration::from_secs(1), receiver.recv())
591 .await
592 .unwrap()
593 .unwrap();
594 for b in committed_subdag.blocks {
595 for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
596 assert!(
597 expected_transactions.remove(&txn),
598 "Transaction not submitted or already seen: {:?}",
599 txn
600 );
601 }
602 }
603 assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
604 if expected_transactions.is_empty() {
605 break;
606 }
607 }
608 }
609
610 let index = committee.to_authority_index(1).unwrap();
612 authorities.remove(index.value()).stop().await;
613 sleep(Duration::from_secs(10)).await;
614
615 let (authority, commit_receiver) = make_authority(
617 index,
618 &temp_dirs[index.value()],
619 committee.clone(),
620 keypairs.clone(),
621 network_type,
622 boot_counters[index],
623 protocol_config.clone(),
624 )
625 .await;
626 boot_counters[index] += 1;
627 commit_receivers[index] = commit_receiver;
628 authorities.insert(index.value(), authority);
629 sleep(Duration::from_secs(10)).await;
630
631 for authority in authorities {
633 authority.stop().await;
634 }
635 }
636
637 #[rstest]
638 #[tokio::test(flavor = "current_thread")]
639 async fn test_small_committee(
640 #[values(NetworkType::Tonic)] network_type: NetworkType,
641 #[values(1, 2, 3)] num_authorities: usize,
642 ) {
643 telemetry_subscribers::init_for_testing();
644 let db_registry = Registry::new();
645 DBMetrics::init(RegistryService::new(db_registry));
646
647 let (committee, keypairs) = local_committee_and_keys(0, vec![1; num_authorities]);
648 let protocol_config: ProtocolConfig = ProtocolConfig::get_for_max_version_UNSAFE();
649
650 let temp_dirs = (0..num_authorities)
651 .map(|_| TempDir::new().unwrap())
652 .collect::<Vec<_>>();
653
654 let mut output_receivers = Vec::with_capacity(committee.size());
655 let mut authorities: Vec<ConsensusAuthority> = Vec::with_capacity(committee.size());
656 let mut boot_counters = vec![0; num_authorities];
657
658 for (index, _authority_info) in committee.authorities() {
659 let (authority, commit_receiver) = make_authority(
660 index,
661 &temp_dirs[index.value()],
662 committee.clone(),
663 keypairs.clone(),
664 network_type,
665 boot_counters[index],
666 protocol_config.clone(),
667 )
668 .await;
669 boot_counters[index] += 1;
670 output_receivers.push(commit_receiver);
671 authorities.push(authority);
672 }
673
674 const NUM_TRANSACTIONS: u8 = 15;
675 let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
676 for i in 0..NUM_TRANSACTIONS {
677 let txn = vec![i; 16];
678 submitted_transactions.insert(txn.clone());
679 authorities[i as usize % authorities.len()]
680 .transaction_client()
681 .submit(vec![txn])
682 .await
683 .unwrap();
684 }
685
686 for receiver in &mut output_receivers {
687 let mut expected_transactions = submitted_transactions.clone();
688 loop {
689 let committed_subdag =
690 tokio::time::timeout(Duration::from_secs(1), receiver.recv())
691 .await
692 .unwrap()
693 .unwrap();
694 for b in committed_subdag.blocks {
695 for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
696 assert!(
697 expected_transactions.remove(&txn),
698 "Transaction not submitted or already seen: {:?}",
699 txn
700 );
701 }
702 }
703 assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
704 if expected_transactions.is_empty() {
705 break;
706 }
707 }
708 }
709
710 let index = committee.to_authority_index(0).unwrap();
712 authorities.remove(index.value()).stop().await;
713 sleep(Duration::from_secs(10)).await;
714
715 let (authority, commit_receiver) = make_authority(
717 index,
718 &temp_dirs[index.value()],
719 committee.clone(),
720 keypairs.clone(),
721 network_type,
722 boot_counters[index],
723 protocol_config.clone(),
724 )
725 .await;
726 boot_counters[index] += 1;
727 output_receivers[index] = commit_receiver;
728 authorities.insert(index.value(), authority);
729 sleep(Duration::from_secs(10)).await;
730
731 for authority in authorities {
733 authority.stop().await;
734 }
735 }
736
737 #[rstest]
738 #[tokio::test(flavor = "current_thread")]
739 async fn test_amnesia_recovery_success(#[values(5, 10)] gc_depth: u32) {
740 telemetry_subscribers::init_for_testing();
741 let db_registry = Registry::new();
742 DBMetrics::init(RegistryService::new(db_registry));
743
744 const NUM_OF_AUTHORITIES: usize = 4;
745 let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
746 let mut commit_receivers = vec![];
747 let mut authorities = BTreeMap::new();
748 let mut temp_dirs = BTreeMap::new();
749 let mut boot_counters = [0; NUM_OF_AUTHORITIES];
750
751 let mut protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
752 protocol_config.set_consensus_gc_depth_for_testing(gc_depth);
753
754 for (index, _authority_info) in committee.authorities() {
755 let dir = TempDir::new().unwrap();
756 let (authority, commit_receiver) = make_authority(
757 index,
758 &dir,
759 committee.clone(),
760 keypairs.clone(),
761 NetworkType::Tonic,
762 boot_counters[index],
763 protocol_config.clone(),
764 )
765 .await;
766 boot_counters[index] += 1;
767 commit_receivers.push(commit_receiver);
768 authorities.insert(index, authority);
769 temp_dirs.insert(index, dir);
770 }
771
772 let index_1 = committee.to_authority_index(1).unwrap();
776 'outer: while let Some(result) =
777 timeout(Duration::from_secs(10), commit_receivers[index_1].recv())
778 .await
779 .expect("Timed out while waiting for at least one committed block from authority 1")
780 {
781 for block in result.blocks {
782 if block.round() > GENESIS_ROUND && block.author() == index_1 {
783 break 'outer;
784 }
785 }
786 }
787
788 authorities.remove(&index_1).unwrap().stop().await;
794 let index_2 = committee.to_authority_index(2).unwrap();
795 authorities.remove(&index_2).unwrap().stop().await;
796 sleep(Duration::from_secs(5)).await;
797
798 let dir = TempDir::new().unwrap();
802 boot_counters[index_1] = 0;
804 let (authority, mut commit_receiver) = make_authority(
805 index_1,
806 &dir,
807 committee.clone(),
808 keypairs.clone(),
809 NetworkType::Tonic,
810 boot_counters[index_1],
811 protocol_config.clone(),
812 )
813 .await;
814 boot_counters[index_1] += 1;
815 authorities.insert(index_1, authority);
816 temp_dirs.insert(index_1, dir);
817 sleep(Duration::from_secs(5)).await;
818
819 let (authority, _commit_receiver) = make_authority(
822 index_2,
823 &temp_dirs[&index_2],
824 committee.clone(),
825 keypairs,
826 NetworkType::Tonic,
827 boot_counters[index_2],
828 protocol_config.clone(),
829 )
830 .await;
831 boot_counters[index_2] += 1;
832 authorities.insert(index_2, authority);
833 sleep(Duration::from_secs(5)).await;
834
835 'outer: while let Some(result) = commit_receiver.recv().await {
837 for block in result.blocks {
838 if block.round() > GENESIS_ROUND && block.author() == index_1 {
839 break 'outer;
840 }
841 }
842 }
843
844 for (_, authority) in authorities {
846 authority.stop().await;
847 }
848 }
849
850 async fn make_authority(
852 index: AuthorityIndex,
853 db_dir: &TempDir,
854 committee: Committee,
855 keypairs: Vec<(NetworkKeyPair, ProtocolKeyPair)>,
856 network_type: NetworkType,
857 boot_counter: u64,
858 protocol_config: ProtocolConfig,
859 ) -> (ConsensusAuthority, UnboundedReceiver<CommittedSubDag>) {
860 let registry = Registry::new();
861
862 let parameters = Parameters {
864 db_path: db_dir.path().to_path_buf(),
865 dag_state_cached_rounds: 5,
866 commit_sync_parallel_fetches: 2,
867 commit_sync_batch_size: 3,
868 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
869 ..Default::default()
870 };
871 let txn_verifier = NoopTransactionVerifier {};
872
873 let protocol_keypair = keypairs[index].1.clone();
874 let network_keypair = keypairs[index].0.clone();
875
876 let (commit_consumer, commit_receiver) = CommitConsumerArgs::new(0, 0);
877
878 let authority = ConsensusAuthority::start(
879 network_type,
880 0,
881 index,
882 committee,
883 parameters,
884 protocol_config,
885 protocol_keypair,
886 network_keypair,
887 Arc::new(Clock::default()),
888 Arc::new(txn_verifier),
889 commit_consumer,
890 registry,
891 boot_counter,
892 )
893 .await;
894
895 (authority, commit_receiver)
896 }
897}