1use std::{sync::Arc, time::Instant};
5
6use consensus_config::{
7 AuthorityIndex, ChainType, Committee, ConsensusProtocolConfig, NetworkKeyPair,
8 NetworkPublicKey, Parameters, ProtocolKeyPair,
9};
10use consensus_types::block::Round;
11use itertools::Itertools;
12use mysten_network::Multiaddr;
13use parking_lot::RwLock;
14use prometheus::Registry;
15use tracing::{info, warn};
16
17use crate::{
18 BlockAPI as _, CommitConsumerArgs,
19 authority_service::AuthorityService,
20 block_manager::BlockManager,
21 block_verifier::SignedBlockVerifier,
22 commit_observer::CommitObserver,
23 commit_syncer::{CommitSyncer, CommitSyncerHandle},
24 commit_vote_monitor::CommitVoteMonitor,
25 context::{Clock, Context},
26 core::{Core, CoreSignals},
27 core_thread::{ChannelCoreThreadDispatcher, CoreThreadHandle},
28 dag_state::DagState,
29 leader_schedule::LeaderSchedule,
30 leader_timeout::{LeaderTimeoutTask, LeaderTimeoutTaskHandle},
31 metrics::initialise_metrics,
32 network::{
33 CommitSyncerClient, NetworkManager, SynchronizerClient, tonic_network::TonicManager,
34 },
35 observer_service::ObserverService,
36 round_prober::{RoundProber, RoundProberHandle},
37 round_tracker::RoundTracker,
38 storage::rocksdb_store::RocksDBStore,
39 subscriber::Subscriber,
40 synchronizer::{Synchronizer, SynchronizerHandle},
41 transaction::{TransactionClient, TransactionConsumer, TransactionVerifier},
42 transaction_vote_tracker::TransactionVoteTracker,
43};
44
45#[allow(private_interfaces)]
48pub enum ConsensusAuthority {
49 WithTonic(AuthorityNode<TonicManager>),
50}
51
52impl ConsensusAuthority {
53 pub async fn start(
54 network_type: NetworkType,
55 epoch_start_timestamp_ms: u64,
56 own_index: AuthorityIndex,
57 committee: Committee,
58 parameters: Parameters,
59 protocol_config: ConsensusProtocolConfig,
60 protocol_keypair: ProtocolKeyPair,
61 network_keypair: NetworkKeyPair,
62 clock: Arc<Clock>,
63 transaction_verifier: Arc<dyn TransactionVerifier>,
64 commit_consumer: CommitConsumerArgs,
65 registry: Registry,
66 boot_counter: u64,
70 ) -> Self {
71 match network_type {
72 NetworkType::Tonic => {
73 let authority = AuthorityNode::start(
74 epoch_start_timestamp_ms,
75 own_index,
76 committee,
77 parameters,
78 protocol_config,
79 protocol_keypair,
80 network_keypair,
81 clock,
82 transaction_verifier,
83 commit_consumer,
84 registry,
85 boot_counter,
86 )
87 .await;
88 Self::WithTonic(authority)
89 }
90 }
91 }
92
93 pub async fn stop(self) {
94 match self {
95 Self::WithTonic(authority) => authority.stop().await,
96 }
97 }
98
99 pub fn update_peer_address(
100 &self,
101 network_pubkey: NetworkPublicKey,
102 address: Option<Multiaddr>,
103 ) {
104 match self {
105 Self::WithTonic(authority) => authority.update_peer_address(network_pubkey, address),
106 }
107 }
108
109 pub fn transaction_client(&self) -> Arc<TransactionClient> {
110 match self {
111 Self::WithTonic(authority) => authority.transaction_client(),
112 }
113 }
114
115 #[cfg(test)]
116 fn context(&self) -> &Arc<Context> {
117 match self {
118 Self::WithTonic(authority) => &authority.context,
119 }
120 }
121}
122
123#[derive(Clone, Copy, PartialEq, Eq, Debug)]
124pub enum NetworkType {
125 Tonic,
126}
127
128pub(crate) struct AuthorityNode<N>
129where
130 N: NetworkManager,
131{
132 context: Arc<Context>,
133 start_time: Instant,
134 transaction_client: Arc<TransactionClient>,
135 synchronizer: Arc<SynchronizerHandle>,
136
137 commit_syncer_handle: CommitSyncerHandle,
138 round_prober_handle: RoundProberHandle,
139 leader_timeout_handle: LeaderTimeoutTaskHandle,
140 core_thread_handle: CoreThreadHandle,
141 subscriber: Subscriber<N::ValidatorClient, AuthorityService<ChannelCoreThreadDispatcher>>,
142 network_manager: N,
143}
144
145impl<N> AuthorityNode<N>
146where
147 N: NetworkManager,
148{
149 pub(crate) async fn start(
151 epoch_start_timestamp_ms: u64,
152 own_index: AuthorityIndex,
153 committee: Committee,
154 parameters: Parameters,
155 protocol_config: ConsensusProtocolConfig,
156 protocol_keypair: ProtocolKeyPair,
157 network_keypair: NetworkKeyPair,
158 clock: Arc<Clock>,
159 transaction_verifier: Arc<dyn TransactionVerifier>,
160 commit_consumer: CommitConsumerArgs,
161 registry: Registry,
162 boot_counter: u64,
163 ) -> Self {
164 assert!(
165 committee.is_valid_index(own_index),
166 "Invalid own index {}",
167 own_index
168 );
169 let own_hostname = committee.authority(own_index).hostname.clone();
170 info!(
171 "Starting consensus authority {} {}, {:?}, epoch start timestamp {}, boot counter {}, replaying after commit index {}, consumer last processed commit index {}",
172 own_index,
173 own_hostname,
174 protocol_config.protocol_version(),
175 epoch_start_timestamp_ms,
176 boot_counter,
177 commit_consumer.replay_after_commit_index,
178 commit_consumer.consumer_last_processed_commit_index
179 );
180 info!(
181 "Consensus authorities: {}",
182 committee
183 .authorities()
184 .map(|(i, a)| format!("{}: {}", i, a.hostname))
185 .join(", ")
186 );
187 info!("Consensus parameters: {:?}", parameters);
188 info!("Consensus committee: {:?}", committee);
189 let context = Arc::new(Context::new(
190 epoch_start_timestamp_ms,
191 own_index,
192 committee,
193 parameters,
194 protocol_config,
195 initialise_metrics(registry),
196 clock,
197 ));
198 let start_time = Instant::now();
199
200 context
201 .metrics
202 .node_metrics
203 .authority_index
204 .with_label_values(&[&own_hostname])
205 .set(context.own_index.value() as i64);
206 context
207 .metrics
208 .node_metrics
209 .protocol_version
210 .set(context.protocol_config.protocol_version() as i64);
211
212 let (tx_client, tx_receiver) = TransactionClient::new(context.clone());
213 let tx_consumer = TransactionConsumer::new(tx_receiver, context.clone());
214
215 let (core_signals, signals_receivers) = CoreSignals::new(context.clone());
216
217 let mut network_manager = N::new(context.clone(), network_keypair);
218 let validator_client = network_manager.validator_client();
219
220 let synchronizer_client = Arc::new(SynchronizerClient::<
221 N::ValidatorClient,
222 N::ObserverClient,
223 >::new(
224 context.clone(),
225 Some(validator_client.clone()),
226 None, ));
228 let commit_syncer_client = Arc::new(CommitSyncerClient::<
229 N::ValidatorClient,
230 N::ObserverClient,
231 >::new(
232 context.clone(),
233 Some(validator_client.clone()),
234 None, ));
236
237 let store_path = context.parameters.db_path.as_path().to_str().unwrap();
238 let use_fifo_compaction = context.parameters.use_fifo_compaction
239 && (context.protocol_config.chain() != ChainType::Mainnet
240 || context.own_index.value().is_multiple_of(10));
241 let store = Arc::new(RocksDBStore::new(store_path, use_fifo_compaction));
242 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
243
244 let block_verifier = Arc::new(SignedBlockVerifier::new(
245 context.clone(),
246 transaction_verifier,
247 ));
248
249 let transaction_vote_tracker =
250 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
251
252 let sync_last_known_own_block = boot_counter == 0
253 && !context
254 .parameters
255 .sync_last_known_own_block_timeout
256 .is_zero();
257 info!(
258 "Sync last known own block: {}. Boot count: {}. Timeout: {:?}.",
259 sync_last_known_own_block,
260 boot_counter,
261 context.parameters.sync_last_known_own_block_timeout
262 );
263
264 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
265
266 let leader_schedule = Arc::new(LeaderSchedule::from_store(
267 context.clone(),
268 dag_state.clone(),
269 ));
270
271 let commit_consumer_monitor = commit_consumer.monitor();
272 let commit_observer = CommitObserver::new(
273 context.clone(),
274 commit_consumer,
275 dag_state.clone(),
276 transaction_vote_tracker.clone(),
277 leader_schedule.clone(),
278 )
279 .await;
280
281 let initial_received_rounds = dag_state
282 .read()
283 .get_last_cached_block_per_authority(Round::MAX)
284 .into_iter()
285 .map(|(block, _)| block.round())
286 .collect::<Vec<_>>();
287 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(
288 context.clone(),
289 initial_received_rounds,
290 )));
291
292 let core = Core::new(
295 context.clone(),
296 leader_schedule,
297 tx_consumer,
298 transaction_vote_tracker.clone(),
299 block_manager,
300 commit_observer,
301 core_signals,
302 protocol_keypair,
303 dag_state.clone(),
304 sync_last_known_own_block,
305 round_tracker.clone(),
306 );
307
308 let (core_dispatcher, core_thread_handle) =
309 ChannelCoreThreadDispatcher::start(context.clone(), &dag_state, core);
310 let core_dispatcher = Arc::new(core_dispatcher);
311 let leader_timeout_handle =
312 LeaderTimeoutTask::start(core_dispatcher.clone(), &signals_receivers, context.clone());
313
314 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
315
316 let synchronizer = Synchronizer::start(
317 synchronizer_client.clone(),
318 context.clone(),
319 core_dispatcher.clone(),
320 commit_vote_monitor.clone(),
321 block_verifier.clone(),
322 transaction_vote_tracker.clone(),
323 round_tracker.clone(),
324 dag_state.clone(),
325 sync_last_known_own_block,
326 );
327
328 let commit_syncer_handle = CommitSyncer::new(
329 context.clone(),
330 core_dispatcher.clone(),
331 commit_vote_monitor.clone(),
332 commit_consumer_monitor.clone(),
333 block_verifier.clone(),
334 transaction_vote_tracker.clone(),
335 round_tracker.clone(),
336 commit_syncer_client.clone(),
337 dag_state.clone(),
338 )
339 .start();
340
341 let round_prober_handle = RoundProber::new(
342 context.clone(),
343 core_dispatcher.clone(),
344 round_tracker.clone(),
345 dag_state.clone(),
346 validator_client.clone(),
347 )
348 .start();
349
350 let network_service = Arc::new(AuthorityService::new(
351 context.clone(),
352 block_verifier,
353 commit_vote_monitor,
354 round_tracker.clone(),
355 synchronizer.clone(),
356 core_dispatcher,
357 signals_receivers.block_broadcast_receiver(),
358 transaction_vote_tracker,
359 dag_state.clone(),
360 store.clone(),
361 ));
362
363 let subscriber = {
364 let s = Subscriber::new(
365 context.clone(),
366 validator_client,
367 network_service.clone(),
368 dag_state.clone(),
369 );
370 for (peer, _) in context.committee.authorities() {
371 if peer != context.own_index {
372 s.subscribe(peer);
373 }
374 }
375 s
376 };
377
378 network_manager
379 .start_validator_server(network_service.clone())
380 .await;
381 if context.parameters.tonic.is_observer_server_enabled() {
382 let observer_service = Arc::new(ObserverService::new(
383 context.clone(),
384 dag_state.clone(),
385 signals_receivers.accepted_block_broadcast_receiver(),
386 ));
387 network_manager
388 .start_observer_server(observer_service)
389 .await;
390 }
391
392 info!(
393 "Consensus authority started, took {:?}",
394 start_time.elapsed()
395 );
396
397 Self {
398 context,
399 start_time,
400 transaction_client: Arc::new(tx_client),
401 synchronizer,
402 commit_syncer_handle,
403 round_prober_handle,
404 leader_timeout_handle,
405 core_thread_handle,
406 subscriber,
407 network_manager,
408 }
409 }
410
411 pub(crate) async fn stop(mut self) {
412 info!(
413 "Stopping authority. Total run time: {:?}",
414 self.start_time.elapsed()
415 );
416
417 if let Err(e) = self.synchronizer.stop().await {
419 if e.is_panic() {
420 std::panic::resume_unwind(e.into_panic());
421 }
422 warn!(
423 "Failed to stop synchronizer when shutting down consensus: {:?}",
424 e
425 );
426 };
427 self.commit_syncer_handle.stop().await;
428 self.round_prober_handle.stop().await;
429 self.leader_timeout_handle.stop().await;
430 self.core_thread_handle.stop().await;
432 self.subscriber.stop();
434 self.network_manager.stop().await;
435
436 self.context
437 .metrics
438 .node_metrics
439 .uptime
440 .observe(self.start_time.elapsed().as_secs_f64());
441 }
442
443 pub(crate) fn transaction_client(&self) -> Arc<TransactionClient> {
444 self.transaction_client.clone()
445 }
446
447 pub(crate) fn update_peer_address(
448 &self,
449 network_pubkey: NetworkPublicKey,
450 address: Option<Multiaddr>,
451 ) {
452 let Some(peer) = self
454 .context
455 .committee
456 .authorities()
457 .find(|(_, authority)| authority.network_key == network_pubkey)
458 .map(|(index, _)| index)
459 else {
460 warn!(
461 "Network public key {:?} not found in committee, ignoring address update",
462 network_pubkey
463 );
464 return;
465 };
466
467 self.network_manager.update_peer_address(peer, address);
469
470 if peer != self.context.own_index {
472 info!("Re-subscribing to peer {} after address update", peer);
473 self.subscriber.subscribe(peer);
474 }
475 }
476}
477
478#[cfg(test)]
479mod tests {
480 #![allow(non_snake_case)]
481
482 use std::{
483 collections::{BTreeMap, BTreeSet},
484 sync::Arc,
485 time::Duration,
486 };
487
488 use consensus_config::{Parameters, local_committee_and_keys};
489 use mysten_metrics::RegistryService;
490 use mysten_metrics::monitored_mpsc::UnboundedReceiver;
491 use prometheus::Registry;
492 use rstest::rstest;
493 use tempfile::TempDir;
494 use tokio::time::{sleep, timeout};
495 use typed_store::DBMetrics;
496
497 use super::*;
498 use crate::{
499 CommittedSubDag,
500 block::{BlockAPI as _, GENESIS_ROUND},
501 transaction::NoopTransactionVerifier,
502 };
503
504 #[rstest]
505 #[tokio::test]
506 async fn test_authority_start_and_stop(
507 #[values(NetworkType::Tonic)] network_type: NetworkType,
508 ) {
509 let (committee, keypairs) = local_committee_and_keys(0, vec![1]);
510 let registry = Registry::new();
511
512 let temp_dir = TempDir::new().unwrap();
513 let parameters = Parameters {
514 db_path: temp_dir.keep(),
515 ..Default::default()
516 };
517 let txn_verifier = NoopTransactionVerifier {};
518
519 let own_index = committee.to_authority_index(0).unwrap();
520 let protocol_keypair = keypairs[own_index].1.clone();
521 let network_keypair = keypairs[own_index].0.clone();
522
523 let (commit_consumer, _) = CommitConsumerArgs::new(0, 0);
524
525 let authority = ConsensusAuthority::start(
526 network_type,
527 0,
528 own_index,
529 committee,
530 parameters,
531 ConsensusProtocolConfig::for_testing(),
532 protocol_keypair,
533 network_keypair,
534 Arc::new(Clock::default()),
535 Arc::new(txn_verifier),
536 commit_consumer,
537 registry,
538 0,
539 )
540 .await;
541
542 assert_eq!(authority.context().own_index, own_index);
543 assert_eq!(authority.context().committee.epoch(), 0);
544 assert_eq!(authority.context().committee.size(), 1);
545
546 authority.stop().await;
547 }
548
549 #[rstest]
551 #[tokio::test(flavor = "current_thread")]
552 async fn test_authority_committee(
553 #[values(NetworkType::Tonic)] network_type: NetworkType,
554 #[values(5, 10)] gc_depth: u32,
555 ) {
556 telemetry_subscribers::init_for_testing();
557 let db_registry = Registry::new();
558 DBMetrics::init(RegistryService::new(db_registry));
559
560 const NUM_OF_AUTHORITIES: usize = 4;
561 let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
562 let mut protocol_config = ConsensusProtocolConfig::for_testing();
563 protocol_config.set_gc_depth_for_testing(gc_depth);
564
565 let temp_dirs = (0..NUM_OF_AUTHORITIES)
566 .map(|_| TempDir::new().unwrap())
567 .collect::<Vec<_>>();
568
569 let mut commit_receivers = Vec::with_capacity(committee.size());
570 let mut authorities = Vec::with_capacity(committee.size());
571 let mut boot_counters = [0; NUM_OF_AUTHORITIES];
572
573 for (index, _authority_info) in committee.authorities() {
574 let (authority, commit_receiver) = make_authority(
575 index,
576 &temp_dirs[index.value()],
577 committee.clone(),
578 keypairs.clone(),
579 network_type,
580 boot_counters[index],
581 protocol_config.clone(),
582 )
583 .await;
584 boot_counters[index] += 1;
585 commit_receivers.push(commit_receiver);
586 authorities.push(authority);
587 }
588
589 const NUM_TRANSACTIONS: u8 = 15;
590 let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
591 for i in 0..NUM_TRANSACTIONS {
592 let txn = vec![i; 16];
593 submitted_transactions.insert(txn.clone());
594 authorities[i as usize % authorities.len()]
595 .transaction_client()
596 .submit(vec![txn])
597 .await
598 .unwrap();
599 }
600
601 for receiver in &mut commit_receivers {
602 let mut expected_transactions = submitted_transactions.clone();
603 loop {
604 let committed_subdag =
605 tokio::time::timeout(Duration::from_secs(1), receiver.recv())
606 .await
607 .unwrap()
608 .unwrap();
609 for b in committed_subdag.blocks {
610 for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
611 assert!(
612 expected_transactions.remove(&txn),
613 "Transaction not submitted or already seen: {:?}",
614 txn
615 );
616 }
617 }
618 assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
619 if expected_transactions.is_empty() {
620 break;
621 }
622 }
623 }
624
625 let index = committee.to_authority_index(1).unwrap();
627 authorities.remove(index.value()).stop().await;
628 sleep(Duration::from_secs(10)).await;
629
630 let (authority, commit_receiver) = make_authority(
632 index,
633 &temp_dirs[index.value()],
634 committee.clone(),
635 keypairs.clone(),
636 network_type,
637 boot_counters[index],
638 protocol_config.clone(),
639 )
640 .await;
641 boot_counters[index] += 1;
642 commit_receivers[index] = commit_receiver;
643 authorities.insert(index.value(), authority);
644 sleep(Duration::from_secs(10)).await;
645
646 for authority in authorities {
648 authority.stop().await;
649 }
650 }
651
652 #[rstest]
653 #[tokio::test(flavor = "current_thread")]
654 async fn test_small_committee(
655 #[values(NetworkType::Tonic)] network_type: NetworkType,
656 #[values(1, 2, 3)] num_authorities: usize,
657 ) {
658 telemetry_subscribers::init_for_testing();
659 let db_registry = Registry::new();
660 DBMetrics::init(RegistryService::new(db_registry));
661
662 let (committee, keypairs) = local_committee_and_keys(0, vec![1; num_authorities]);
663 let protocol_config = ConsensusProtocolConfig::for_testing();
664
665 let temp_dirs = (0..num_authorities)
666 .map(|_| TempDir::new().unwrap())
667 .collect::<Vec<_>>();
668
669 let mut output_receivers = Vec::with_capacity(committee.size());
670 let mut authorities: Vec<ConsensusAuthority> = Vec::with_capacity(committee.size());
671 let mut boot_counters = vec![0; num_authorities];
672
673 for (index, _authority_info) in committee.authorities() {
674 let (authority, commit_receiver) = make_authority(
675 index,
676 &temp_dirs[index.value()],
677 committee.clone(),
678 keypairs.clone(),
679 network_type,
680 boot_counters[index],
681 protocol_config.clone(),
682 )
683 .await;
684 boot_counters[index] += 1;
685 output_receivers.push(commit_receiver);
686 authorities.push(authority);
687 }
688
689 const NUM_TRANSACTIONS: u8 = 15;
690 let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
691 for i in 0..NUM_TRANSACTIONS {
692 let txn = vec![i; 16];
693 submitted_transactions.insert(txn.clone());
694 authorities[i as usize % authorities.len()]
695 .transaction_client()
696 .submit(vec![txn])
697 .await
698 .unwrap();
699 }
700
701 for receiver in &mut output_receivers {
702 let mut expected_transactions = submitted_transactions.clone();
703 loop {
704 let committed_subdag =
705 tokio::time::timeout(Duration::from_secs(1), receiver.recv())
706 .await
707 .unwrap()
708 .unwrap();
709 for b in committed_subdag.blocks {
710 for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
711 assert!(
712 expected_transactions.remove(&txn),
713 "Transaction not submitted or already seen: {:?}",
714 txn
715 );
716 }
717 }
718 assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
719 if expected_transactions.is_empty() {
720 break;
721 }
722 }
723 }
724
725 let index = committee.to_authority_index(0).unwrap();
727 authorities.remove(index.value()).stop().await;
728 sleep(Duration::from_secs(10)).await;
729
730 let (authority, commit_receiver) = make_authority(
732 index,
733 &temp_dirs[index.value()],
734 committee.clone(),
735 keypairs.clone(),
736 network_type,
737 boot_counters[index],
738 protocol_config.clone(),
739 )
740 .await;
741 boot_counters[index] += 1;
742 output_receivers[index] = commit_receiver;
743 authorities.insert(index.value(), authority);
744 sleep(Duration::from_secs(10)).await;
745
746 for authority in authorities {
748 authority.stop().await;
749 }
750 }
751
752 #[rstest]
753 #[tokio::test(flavor = "current_thread")]
754 async fn test_amnesia_recovery_success(#[values(5, 10)] gc_depth: u32) {
755 telemetry_subscribers::init_for_testing();
756 let db_registry = Registry::new();
757 DBMetrics::init(RegistryService::new(db_registry));
758
759 const NUM_OF_AUTHORITIES: usize = 4;
760 let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
761 let mut commit_receivers = vec![];
762 let mut authorities = BTreeMap::new();
763 let mut temp_dirs = BTreeMap::new();
764 let mut boot_counters = [0; NUM_OF_AUTHORITIES];
765
766 let mut protocol_config = ConsensusProtocolConfig::for_testing();
767 protocol_config.set_gc_depth_for_testing(gc_depth);
768
769 for (index, _authority_info) in committee.authorities() {
770 let dir = TempDir::new().unwrap();
771 let (authority, commit_receiver) = make_authority(
772 index,
773 &dir,
774 committee.clone(),
775 keypairs.clone(),
776 NetworkType::Tonic,
777 boot_counters[index],
778 protocol_config.clone(),
779 )
780 .await;
781 boot_counters[index] += 1;
782 commit_receivers.push(commit_receiver);
783 authorities.insert(index, authority);
784 temp_dirs.insert(index, dir);
785 }
786
787 let index_1 = committee.to_authority_index(1).unwrap();
791 'outer: while let Some(result) =
792 timeout(Duration::from_secs(10), commit_receivers[index_1].recv())
793 .await
794 .expect("Timed out while waiting for at least one committed block from authority 1")
795 {
796 for block in result.blocks {
797 if block.round() > GENESIS_ROUND && block.author() == index_1 {
798 break 'outer;
799 }
800 }
801 }
802
803 authorities.remove(&index_1).unwrap().stop().await;
809 let index_2 = committee.to_authority_index(2).unwrap();
810 authorities.remove(&index_2).unwrap().stop().await;
811 sleep(Duration::from_secs(5)).await;
812
813 let dir = TempDir::new().unwrap();
817 boot_counters[index_1] = 0;
819 let (authority, mut commit_receiver) = make_authority(
820 index_1,
821 &dir,
822 committee.clone(),
823 keypairs.clone(),
824 NetworkType::Tonic,
825 boot_counters[index_1],
826 protocol_config.clone(),
827 )
828 .await;
829 boot_counters[index_1] += 1;
830 authorities.insert(index_1, authority);
831 temp_dirs.insert(index_1, dir);
832 sleep(Duration::from_secs(5)).await;
833
834 let (authority, _commit_receiver) = make_authority(
837 index_2,
838 &temp_dirs[&index_2],
839 committee.clone(),
840 keypairs,
841 NetworkType::Tonic,
842 boot_counters[index_2],
843 protocol_config.clone(),
844 )
845 .await;
846 boot_counters[index_2] += 1;
847 authorities.insert(index_2, authority);
848 sleep(Duration::from_secs(5)).await;
849
850 'outer: while let Some(result) = commit_receiver.recv().await {
852 for block in result.blocks {
853 if block.round() > GENESIS_ROUND && block.author() == index_1 {
854 break 'outer;
855 }
856 }
857 }
858
859 for (_, authority) in authorities {
861 authority.stop().await;
862 }
863 }
864
865 async fn make_authority(
867 index: AuthorityIndex,
868 db_dir: &TempDir,
869 committee: Committee,
870 keypairs: Vec<(NetworkKeyPair, ProtocolKeyPair)>,
871 network_type: NetworkType,
872 boot_counter: u64,
873 protocol_config: ConsensusProtocolConfig,
874 ) -> (ConsensusAuthority, UnboundedReceiver<CommittedSubDag>) {
875 let registry = Registry::new();
876
877 let parameters = Parameters {
879 db_path: db_dir.path().to_path_buf(),
880 dag_state_cached_rounds: 5,
881 commit_sync_parallel_fetches: 2,
882 commit_sync_batch_size: 3,
883 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
884 ..Default::default()
885 };
886 let txn_verifier = NoopTransactionVerifier {};
887
888 let protocol_keypair = keypairs[index].1.clone();
889 let network_keypair = keypairs[index].0.clone();
890
891 let (commit_consumer, commit_receiver) = CommitConsumerArgs::new(0, 0);
892
893 let authority = ConsensusAuthority::start(
894 network_type,
895 0,
896 index,
897 committee,
898 parameters,
899 protocol_config,
900 protocol_keypair,
901 network_keypair,
902 Arc::new(Clock::default()),
903 Arc::new(txn_verifier),
904 commit_consumer,
905 registry,
906 boot_counter,
907 )
908 .await;
909
910 (authority, commit_receiver)
911 }
912}