1use std::{sync::Arc, time::Instant};
5
6use consensus_config::{AuthorityIndex, Committee, NetworkKeyPair, Parameters, ProtocolKeyPair};
7use itertools::Itertools;
8use mysten_metrics::spawn_logged_monitored_task;
9use parking_lot::RwLock;
10use prometheus::Registry;
11use sui_protocol_config::{ConsensusNetwork, ProtocolConfig};
12use tokio::task::JoinHandle;
13use tracing::{info, warn};
14
15use crate::{
16 CommitConsumerArgs,
17 authority_service::AuthorityService,
18 block_manager::BlockManager,
19 block_verifier::SignedBlockVerifier,
20 broadcaster::Broadcaster,
21 commit_observer::CommitObserver,
22 commit_syncer::{CommitSyncer, CommitSyncerHandle},
23 commit_vote_monitor::CommitVoteMonitor,
24 context::{Clock, Context},
25 core::{Core, CoreSignals},
26 core_thread::{ChannelCoreThreadDispatcher, CoreThreadHandle},
27 dag_state::DagState,
28 leader_schedule::LeaderSchedule,
29 leader_timeout::{LeaderTimeoutTask, LeaderTimeoutTaskHandle},
30 metrics::initialise_metrics,
31 network::{
32 NetworkClient as _, NetworkManager, anemo_network::AnemoManager,
33 tonic_network::TonicManager,
34 },
35 proposed_block_handler::ProposedBlockHandler,
36 round_prober::{RoundProber, RoundProberHandle},
37 round_tracker::PeerRoundTracker,
38 storage::rocksdb_store::RocksDBStore,
39 subscriber::Subscriber,
40 synchronizer::{Synchronizer, SynchronizerHandle},
41 transaction::{TransactionClient, TransactionConsumer, TransactionVerifier},
42 transaction_certifier::TransactionCertifier,
43};
44
45#[allow(private_interfaces)]
48pub enum ConsensusAuthority {
49 WithAnemo(AuthorityNode<AnemoManager>),
50 WithTonic(AuthorityNode<TonicManager>),
51}
52
53impl ConsensusAuthority {
54 pub async fn start(
55 network_type: ConsensusNetwork,
56 epoch_start_timestamp_ms: u64,
57 own_index: AuthorityIndex,
58 committee: Committee,
59 parameters: Parameters,
60 protocol_config: ProtocolConfig,
61 protocol_keypair: ProtocolKeyPair,
62 network_keypair: NetworkKeyPair,
63 clock: Arc<Clock>,
64 transaction_verifier: Arc<dyn TransactionVerifier>,
65 commit_consumer: CommitConsumerArgs,
66 registry: Registry,
67 boot_counter: u64,
72 ) -> Self {
73 match network_type {
74 ConsensusNetwork::Anemo => {
75 let authority = AuthorityNode::start(
76 epoch_start_timestamp_ms,
77 own_index,
78 committee,
79 parameters,
80 protocol_config,
81 protocol_keypair,
82 network_keypair,
83 clock,
84 transaction_verifier,
85 commit_consumer,
86 registry,
87 boot_counter,
88 )
89 .await;
90 Self::WithAnemo(authority)
91 }
92 ConsensusNetwork::Tonic => {
93 let authority = AuthorityNode::start(
94 epoch_start_timestamp_ms,
95 own_index,
96 committee,
97 parameters,
98 protocol_config,
99 protocol_keypair,
100 network_keypair,
101 clock,
102 transaction_verifier,
103 commit_consumer,
104 registry,
105 boot_counter,
106 )
107 .await;
108 Self::WithTonic(authority)
109 }
110 }
111 }
112
113 pub async fn stop(self) {
114 match self {
115 Self::WithAnemo(authority) => authority.stop().await,
116 Self::WithTonic(authority) => authority.stop().await,
117 }
118 }
119
120 pub fn transaction_client(&self) -> Arc<TransactionClient> {
121 match self {
122 Self::WithAnemo(authority) => authority.transaction_client(),
123 Self::WithTonic(authority) => authority.transaction_client(),
124 }
125 }
126
127 #[cfg(test)]
128 fn context(&self) -> &Arc<Context> {
129 match self {
130 Self::WithAnemo(authority) => &authority.context,
131 Self::WithTonic(authority) => &authority.context,
132 }
133 }
134
135 #[allow(unused)]
136 fn sync_last_known_own_block_enabled(&self) -> bool {
137 match self {
138 Self::WithAnemo(authority) => authority.sync_last_known_own_block,
139 Self::WithTonic(authority) => authority.sync_last_known_own_block,
140 }
141 }
142}
143
144pub(crate) struct AuthorityNode<N>
145where
146 N: NetworkManager<AuthorityService<ChannelCoreThreadDispatcher>>,
147{
148 context: Arc<Context>,
149 start_time: Instant,
150 transaction_client: Arc<TransactionClient>,
151 synchronizer: Arc<SynchronizerHandle>,
152
153 commit_syncer_handle: CommitSyncerHandle,
154 round_prober_handle: RoundProberHandle,
155 proposed_block_handler: JoinHandle<()>,
156 leader_timeout_handle: LeaderTimeoutTaskHandle,
157 core_thread_handle: CoreThreadHandle,
158 broadcaster: Option<Broadcaster>,
161 subscriber: Option<Subscriber<N::Client, AuthorityService<ChannelCoreThreadDispatcher>>>,
162 network_manager: N,
163 sync_last_known_own_block: bool,
164}
165
166impl<N> AuthorityNode<N>
167where
168 N: NetworkManager<AuthorityService<ChannelCoreThreadDispatcher>>,
169{
170 pub(crate) async fn start(
171 epoch_start_timestamp_ms: u64,
172 own_index: AuthorityIndex,
173 committee: Committee,
174 parameters: Parameters,
175 protocol_config: ProtocolConfig,
176 protocol_keypair: ProtocolKeyPair,
179 network_keypair: NetworkKeyPair,
180 clock: Arc<Clock>,
181 transaction_verifier: Arc<dyn TransactionVerifier>,
182 commit_consumer: CommitConsumerArgs,
183 registry: Registry,
184 boot_counter: u64,
185 ) -> Self {
186 assert!(
187 committee.is_valid_index(own_index),
188 "Invalid own index {}",
189 own_index
190 );
191 let own_hostname = committee.authority(own_index).hostname.clone();
192 info!(
193 "Starting consensus authority {} {}, {:?}, epoch start timestamp {}, boot counter {}, replaying after commit index {}, consumer last processed commit index {}",
194 own_index,
195 own_hostname,
196 protocol_config.version,
197 epoch_start_timestamp_ms,
198 boot_counter,
199 commit_consumer.replay_after_commit_index,
200 commit_consumer.consumer_last_processed_commit_index
201 );
202 info!(
203 "Consensus authorities: {}",
204 committee
205 .authorities()
206 .map(|(i, a)| format!("{}: {}", i, a.hostname))
207 .join(", ")
208 );
209 info!("Consensus parameters: {:?}", parameters);
210 info!("Consensus committee: {:?}", committee);
211 let context = Arc::new(Context::new(
212 epoch_start_timestamp_ms,
213 own_index,
214 committee,
215 parameters,
216 protocol_config,
217 initialise_metrics(registry),
218 clock,
219 ));
220 let start_time = Instant::now();
221
222 context
223 .metrics
224 .node_metrics
225 .authority_index
226 .with_label_values(&[&own_hostname])
227 .set(context.own_index.value() as i64);
228 context
229 .metrics
230 .node_metrics
231 .protocol_version
232 .set(context.protocol_config.version.as_u64() as i64);
233
234 let (tx_client, tx_receiver) = TransactionClient::new(context.clone());
235 let tx_consumer = TransactionConsumer::new(tx_receiver, context.clone());
236
237 let (core_signals, signals_receivers) = CoreSignals::new(context.clone());
238
239 let mut network_manager = N::new(context.clone(), network_keypair);
240 let network_client = network_manager.client();
241
242 let broadcaster = if N::Client::SUPPORT_STREAMING {
245 None
246 } else {
247 Some(Broadcaster::new(
248 context.clone(),
249 network_client.clone(),
250 &signals_receivers,
251 ))
252 };
253
254 let store_path = context.parameters.db_path.as_path().to_str().unwrap();
255 let store = Arc::new(RocksDBStore::new(store_path));
256 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
257
258 let block_verifier = Arc::new(SignedBlockVerifier::new(
259 context.clone(),
260 transaction_verifier,
261 ));
262
263 let transaction_certifier = TransactionCertifier::new(
264 context.clone(),
265 block_verifier.clone(),
266 dag_state.clone(),
267 commit_consumer.block_sender.clone(),
268 );
269
270 let mut proposed_block_handler = ProposedBlockHandler::new(
271 context.clone(),
272 signals_receivers.block_broadcast_receiver(),
273 transaction_certifier.clone(),
274 );
275
276 let proposed_block_handler =
277 spawn_logged_monitored_task!(proposed_block_handler.run(), "proposed_block_handler");
278
279 let sync_last_known_own_block = boot_counter == 0
280 && dag_state.read().highest_accepted_round() == 0
281 && !context
282 .parameters
283 .sync_last_known_own_block_timeout
284 .is_zero();
285 info!("Sync last known own block: {sync_last_known_own_block}");
286
287 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
288
289 let leader_schedule = Arc::new(LeaderSchedule::from_store(
290 context.clone(),
291 dag_state.clone(),
292 ));
293
294 let commit_consumer_monitor = commit_consumer.monitor();
295 let commit_observer = CommitObserver::new(
296 context.clone(),
297 commit_consumer,
298 dag_state.clone(),
299 transaction_certifier.clone(),
300 leader_schedule.clone(),
301 )
302 .await;
303
304 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
305
306 let core = Core::new(
307 context.clone(),
308 leader_schedule,
309 tx_consumer,
310 transaction_certifier.clone(),
311 block_manager,
312 !N::Client::SUPPORT_STREAMING || context.committee.size() == 1,
316 commit_observer,
317 core_signals,
318 protocol_keypair,
319 dag_state.clone(),
320 sync_last_known_own_block,
321 round_tracker.clone(),
322 );
323
324 let (core_dispatcher, core_thread_handle) =
325 ChannelCoreThreadDispatcher::start(context.clone(), &dag_state, core);
326 let core_dispatcher = Arc::new(core_dispatcher);
327 let leader_timeout_handle =
328 LeaderTimeoutTask::start(core_dispatcher.clone(), &signals_receivers, context.clone());
329
330 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
331
332 let synchronizer = Synchronizer::start(
333 network_client.clone(),
334 context.clone(),
335 core_dispatcher.clone(),
336 commit_vote_monitor.clone(),
337 block_verifier.clone(),
338 transaction_certifier.clone(),
339 dag_state.clone(),
340 sync_last_known_own_block,
341 );
342
343 let commit_syncer_handle = CommitSyncer::new(
344 context.clone(),
345 core_dispatcher.clone(),
346 commit_vote_monitor.clone(),
347 commit_consumer_monitor.clone(),
348 block_verifier.clone(),
349 transaction_certifier.clone(),
350 network_client.clone(),
351 dag_state.clone(),
352 )
353 .start();
354
355 let round_prober_handle = RoundProber::new(
356 context.clone(),
357 core_dispatcher.clone(),
358 round_tracker.clone(),
359 dag_state.clone(),
360 network_client.clone(),
361 )
362 .start();
363
364 let network_service = Arc::new(AuthorityService::new(
365 context.clone(),
366 block_verifier,
367 commit_vote_monitor,
368 round_tracker.clone(),
369 synchronizer.clone(),
370 core_dispatcher,
371 signals_receivers.block_broadcast_receiver(),
372 transaction_certifier,
373 dag_state.clone(),
374 store,
375 ));
376
377 let subscriber = if N::Client::SUPPORT_STREAMING {
378 let s = Subscriber::new(
379 context.clone(),
380 network_client,
381 network_service.clone(),
382 dag_state,
383 );
384 for (peer, _) in context.committee.authorities() {
385 if peer != context.own_index {
386 s.subscribe(peer);
387 }
388 }
389 Some(s)
390 } else {
391 None
392 };
393
394 network_manager.install_service(network_service).await;
395
396 info!(
397 "Consensus authority started, took {:?}",
398 start_time.elapsed()
399 );
400
401 Self {
402 context,
403 start_time,
404 transaction_client: Arc::new(tx_client),
405 synchronizer,
406 commit_syncer_handle,
407 round_prober_handle,
408 proposed_block_handler,
409 leader_timeout_handle,
410 core_thread_handle,
411 broadcaster,
412 subscriber,
413 network_manager,
414 sync_last_known_own_block,
415 }
416 }
417
418 pub(crate) async fn stop(mut self) {
419 info!(
420 "Stopping authority. Total run time: {:?}",
421 self.start_time.elapsed()
422 );
423
424 if let Err(e) = self.synchronizer.stop().await {
426 if e.is_panic() {
427 std::panic::resume_unwind(e.into_panic());
428 }
429 warn!(
430 "Failed to stop synchronizer when shutting down consensus: {:?}",
431 e
432 );
433 };
434 self.commit_syncer_handle.stop().await;
435 self.round_prober_handle.stop().await;
436 self.proposed_block_handler.abort();
437 self.leader_timeout_handle.stop().await;
438 self.core_thread_handle.stop().await;
441 if let Some(mut broadcaster) = self.broadcaster.take() {
442 broadcaster.stop();
443 }
444 if let Some(subscriber) = self.subscriber.take() {
446 subscriber.stop();
447 }
448 self.network_manager.stop().await;
449
450 self.context
451 .metrics
452 .node_metrics
453 .uptime
454 .observe(self.start_time.elapsed().as_secs_f64());
455 }
456
457 pub(crate) fn transaction_client(&self) -> Arc<TransactionClient> {
458 self.transaction_client.clone()
459 }
460}
461
462#[cfg(test)]
463mod tests {
464 #![allow(non_snake_case)]
465
466 use std::{
467 collections::{BTreeMap, BTreeSet},
468 sync::Arc,
469 time::Duration,
470 };
471
472 use consensus_config::{Parameters, local_committee_and_keys};
473 use mysten_metrics::RegistryService;
474 use mysten_metrics::monitored_mpsc::UnboundedReceiver;
475 use prometheus::Registry;
476 use rstest::rstest;
477 use sui_protocol_config::ProtocolConfig;
478 use tempfile::TempDir;
479 use tokio::time::{sleep, timeout};
480 use typed_store::DBMetrics;
481
482 use super::*;
483 use crate::{
484 CommittedSubDag,
485 block::{BlockAPI as _, CertifiedBlocksOutput, GENESIS_ROUND},
486 transaction::NoopTransactionVerifier,
487 };
488
489 #[rstest]
490 #[tokio::test]
491 async fn test_authority_start_and_stop(
492 #[values(ConsensusNetwork::Anemo, ConsensusNetwork::Tonic)] network_type: ConsensusNetwork,
493 ) {
494 let (committee, keypairs) = local_committee_and_keys(0, vec![1]);
495 let registry = Registry::new();
496
497 let temp_dir = TempDir::new().unwrap();
498 let parameters = Parameters {
499 db_path: temp_dir.keep(),
500 ..Default::default()
501 };
502 let txn_verifier = NoopTransactionVerifier {};
503
504 let own_index = committee.to_authority_index(0).unwrap();
505 let protocol_keypair = keypairs[own_index].1.clone();
506 let network_keypair = keypairs[own_index].0.clone();
507
508 let (commit_consumer, _, _) = CommitConsumerArgs::new(0, 0);
509
510 let authority = ConsensusAuthority::start(
511 network_type,
512 0,
513 own_index,
514 committee,
515 parameters,
516 ProtocolConfig::get_for_max_version_UNSAFE(),
517 protocol_keypair,
518 network_keypair,
519 Arc::new(Clock::default()),
520 Arc::new(txn_verifier),
521 commit_consumer,
522 registry,
523 0,
524 )
525 .await;
526
527 assert_eq!(authority.context().own_index, own_index);
528 assert_eq!(authority.context().committee.epoch(), 0);
529 assert_eq!(authority.context().committee.size(), 1);
530
531 authority.stop().await;
532 }
533
534 #[rstest]
536 #[tokio::test(flavor = "current_thread")]
537 async fn test_authority_committee(
538 #[values(ConsensusNetwork::Anemo, ConsensusNetwork::Tonic)] network_type: ConsensusNetwork,
539 #[values(5, 10)] gc_depth: u32,
540 ) {
541 telemetry_subscribers::init_for_testing();
542 let db_registry = Registry::new();
543 DBMetrics::init(RegistryService::new(db_registry));
544
545 const NUM_OF_AUTHORITIES: usize = 4;
546 let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
547 let mut protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
548 protocol_config.set_consensus_gc_depth_for_testing(gc_depth);
549
550 let temp_dirs = (0..NUM_OF_AUTHORITIES)
551 .map(|_| TempDir::new().unwrap())
552 .collect::<Vec<_>>();
553
554 let mut commit_receivers = Vec::with_capacity(committee.size());
555 let mut block_receivers = Vec::with_capacity(committee.size());
556 let mut authorities = Vec::with_capacity(committee.size());
557 let mut boot_counters = [0; NUM_OF_AUTHORITIES];
558
559 for (index, _authority_info) in committee.authorities() {
560 let (authority, commit_receiver, block_receiver) = make_authority(
561 index,
562 &temp_dirs[index.value()],
563 committee.clone(),
564 keypairs.clone(),
565 network_type,
566 boot_counters[index],
567 protocol_config.clone(),
568 )
569 .await;
570 boot_counters[index] += 1;
571 commit_receivers.push(commit_receiver);
572 block_receivers.push(block_receiver);
573 authorities.push(authority);
574 }
575
576 const NUM_TRANSACTIONS: u8 = 15;
577 let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
578 for i in 0..NUM_TRANSACTIONS {
579 let txn = vec![i; 16];
580 submitted_transactions.insert(txn.clone());
581 authorities[i as usize % authorities.len()]
582 .transaction_client()
583 .submit(vec![txn])
584 .await
585 .unwrap();
586 }
587
588 for receiver in &mut commit_receivers {
589 let mut expected_transactions = submitted_transactions.clone();
590 loop {
591 let committed_subdag =
592 tokio::time::timeout(Duration::from_secs(1), receiver.recv())
593 .await
594 .unwrap()
595 .unwrap();
596 for b in committed_subdag.blocks {
597 for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
598 assert!(
599 expected_transactions.remove(&txn),
600 "Transaction not submitted or already seen: {:?}",
601 txn
602 );
603 }
604 }
605 assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
606 if expected_transactions.is_empty() {
607 break;
608 }
609 }
610 }
611
612 let index = committee.to_authority_index(1).unwrap();
614 authorities.remove(index.value()).stop().await;
615 sleep(Duration::from_secs(10)).await;
616
617 let (authority, commit_receiver, block_receiver) = make_authority(
619 index,
620 &temp_dirs[index.value()],
621 committee.clone(),
622 keypairs.clone(),
623 network_type,
624 boot_counters[index],
625 protocol_config.clone(),
626 )
627 .await;
628 boot_counters[index] += 1;
629 commit_receivers[index] = commit_receiver;
630 block_receivers[index] = block_receiver;
631 authorities.insert(index.value(), authority);
632 sleep(Duration::from_secs(10)).await;
633
634 for authority in authorities {
636 authority.stop().await;
637 }
638 }
639
640 #[rstest]
641 #[tokio::test(flavor = "current_thread")]
642 async fn test_small_committee(
643 #[values(ConsensusNetwork::Anemo, ConsensusNetwork::Tonic)] network_type: ConsensusNetwork,
644 #[values(1, 2, 3)] num_authorities: usize,
645 ) {
646 telemetry_subscribers::init_for_testing();
647 let db_registry = Registry::new();
648 DBMetrics::init(RegistryService::new(db_registry));
649
650 let (committee, keypairs) = local_committee_and_keys(0, vec![1; num_authorities]);
651 let protocol_config: ProtocolConfig = ProtocolConfig::get_for_max_version_UNSAFE();
652
653 let temp_dirs = (0..num_authorities)
654 .map(|_| TempDir::new().unwrap())
655 .collect::<Vec<_>>();
656
657 let mut output_receivers = Vec::with_capacity(committee.size());
658 let mut authorities: Vec<ConsensusAuthority> = Vec::with_capacity(committee.size());
659 let mut boot_counters = vec![0; num_authorities];
660
661 for (index, _authority_info) in committee.authorities() {
662 let (authority, commit_receiver, _block_receiver) = make_authority(
663 index,
664 &temp_dirs[index.value()],
665 committee.clone(),
666 keypairs.clone(),
667 network_type,
668 boot_counters[index],
669 protocol_config.clone(),
670 )
671 .await;
672 boot_counters[index] += 1;
673 output_receivers.push(commit_receiver);
674 authorities.push(authority);
675 }
676
677 const NUM_TRANSACTIONS: u8 = 15;
678 let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
679 for i in 0..NUM_TRANSACTIONS {
680 let txn = vec![i; 16];
681 submitted_transactions.insert(txn.clone());
682 authorities[i as usize % authorities.len()]
683 .transaction_client()
684 .submit(vec![txn])
685 .await
686 .unwrap();
687 }
688
689 for receiver in &mut output_receivers {
690 let mut expected_transactions = submitted_transactions.clone();
691 loop {
692 let committed_subdag =
693 tokio::time::timeout(Duration::from_secs(1), receiver.recv())
694 .await
695 .unwrap()
696 .unwrap();
697 for b in committed_subdag.blocks {
698 for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
699 assert!(
700 expected_transactions.remove(&txn),
701 "Transaction not submitted or already seen: {:?}",
702 txn
703 );
704 }
705 }
706 assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
707 if expected_transactions.is_empty() {
708 break;
709 }
710 }
711 }
712
713 let index = committee.to_authority_index(0).unwrap();
715 authorities.remove(index.value()).stop().await;
716 sleep(Duration::from_secs(10)).await;
717
718 let (authority, commit_receiver, _block_receiver) = make_authority(
720 index,
721 &temp_dirs[index.value()],
722 committee.clone(),
723 keypairs.clone(),
724 network_type,
725 boot_counters[index],
726 protocol_config.clone(),
727 )
728 .await;
729 boot_counters[index] += 1;
730 output_receivers[index] = commit_receiver;
731 authorities.insert(index.value(), authority);
732 sleep(Duration::from_secs(10)).await;
733
734 for authority in authorities {
736 authority.stop().await;
737 }
738 }
739
740 #[rstest]
741 #[tokio::test(flavor = "current_thread")]
742 async fn test_amnesia_recovery_success(#[values(5, 10)] gc_depth: u32) {
743 telemetry_subscribers::init_for_testing();
744 let db_registry = Registry::new();
745 DBMetrics::init(RegistryService::new(db_registry));
746
747 const NUM_OF_AUTHORITIES: usize = 4;
748 let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
749 let mut commit_receivers = vec![];
750 let mut block_receivers = vec![];
751 let mut authorities = BTreeMap::new();
752 let mut temp_dirs = BTreeMap::new();
753 let mut boot_counters = [0; NUM_OF_AUTHORITIES];
754
755 let mut protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
756 protocol_config.set_consensus_gc_depth_for_testing(gc_depth);
757
758 for (index, _authority_info) in committee.authorities() {
759 let dir = TempDir::new().unwrap();
760 let (authority, commit_receiver, block_receiver) = make_authority(
761 index,
762 &dir,
763 committee.clone(),
764 keypairs.clone(),
765 ConsensusNetwork::Tonic,
766 boot_counters[index],
767 protocol_config.clone(),
768 )
769 .await;
770 assert!(
771 authority.sync_last_known_own_block_enabled(),
772 "Expected syncing of last known own block to be enabled as all authorities are of empty db and boot for first time."
773 );
774 boot_counters[index] += 1;
775 commit_receivers.push(commit_receiver);
776 block_receivers.push(block_receiver);
777 authorities.insert(index, authority);
778 temp_dirs.insert(index, dir);
779 }
780
781 let index_1 = committee.to_authority_index(1).unwrap();
785 'outer: while let Some(result) =
786 timeout(Duration::from_secs(10), commit_receivers[index_1].recv())
787 .await
788 .expect("Timed out while waiting for at least one committed block from authority 1")
789 {
790 for block in result.blocks {
791 if block.round() > GENESIS_ROUND && block.author() == index_1 {
792 break 'outer;
793 }
794 }
795 }
796
797 authorities.remove(&index_1).unwrap().stop().await;
803 let index_2 = committee.to_authority_index(2).unwrap();
804 authorities.remove(&index_2).unwrap().stop().await;
805 sleep(Duration::from_secs(5)).await;
806
807 let dir = TempDir::new().unwrap();
811 boot_counters[index_1] = 0;
813 let (authority, mut commit_receiver, _block_receiver) = make_authority(
814 index_1,
815 &dir,
816 committee.clone(),
817 keypairs.clone(),
818 ConsensusNetwork::Tonic,
819 boot_counters[index_1],
820 protocol_config.clone(),
821 )
822 .await;
823 assert!(
824 authority.sync_last_known_own_block_enabled(),
825 "Authority should have the sync of last own block enabled"
826 );
827 boot_counters[index_1] += 1;
828 authorities.insert(index_1, authority);
829 temp_dirs.insert(index_1, dir);
830 sleep(Duration::from_secs(5)).await;
831
832 let (authority, _commit_receiver, _block_receiver) = make_authority(
835 index_2,
836 &temp_dirs[&index_2],
837 committee.clone(),
838 keypairs,
839 ConsensusNetwork::Tonic,
840 boot_counters[index_2],
841 protocol_config.clone(),
842 )
843 .await;
844 assert!(
845 !authority.sync_last_known_own_block_enabled(),
846 "Authority should not have attempted to sync the last own block"
847 );
848 boot_counters[index_2] += 1;
849 authorities.insert(index_2, authority);
850 sleep(Duration::from_secs(5)).await;
851
852 'outer: while let Some(result) = commit_receiver.recv().await {
854 for block in result.blocks {
855 if block.round() > GENESIS_ROUND && block.author() == index_1 {
856 break 'outer;
857 }
858 }
859 }
860
861 for (_, authority) in authorities {
863 authority.stop().await;
864 }
865 }
866
867 async fn make_authority(
869 index: AuthorityIndex,
870 db_dir: &TempDir,
871 committee: Committee,
872 keypairs: Vec<(NetworkKeyPair, ProtocolKeyPair)>,
873 network_type: ConsensusNetwork,
874 boot_counter: u64,
875 protocol_config: ProtocolConfig,
876 ) -> (
877 ConsensusAuthority,
878 UnboundedReceiver<CommittedSubDag>,
879 UnboundedReceiver<CertifiedBlocksOutput>,
880 ) {
881 let registry = Registry::new();
882
883 let parameters = Parameters {
885 db_path: db_dir.path().to_path_buf(),
886 dag_state_cached_rounds: 5,
887 commit_sync_parallel_fetches: 2,
888 commit_sync_batch_size: 3,
889 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
890 ..Default::default()
891 };
892 let txn_verifier = NoopTransactionVerifier {};
893
894 let protocol_keypair = keypairs[index].1.clone();
895 let network_keypair = keypairs[index].0.clone();
896
897 let (commit_consumer, commit_receiver, block_receiver) = CommitConsumerArgs::new(0, 0);
898
899 let authority = ConsensusAuthority::start(
900 network_type,
901 0,
902 index,
903 committee,
904 parameters,
905 protocol_config,
906 protocol_keypair,
907 network_keypair,
908 Arc::new(Clock::default()),
909 Arc::new(txn_verifier),
910 commit_consumer,
911 registry,
912 boot_counter,
913 )
914 .await;
915
916 (authority, commit_receiver, block_receiver)
917 }
918}