1use std::{sync::Arc, time::Instant};
5
6use consensus_config::{
7 Committee, ConsensusProtocolConfig, NetworkKeyPair, NetworkPublicKey, Parameters,
8 ProtocolKeyPair,
9};
10use consensus_types::block::Round;
11use itertools::Itertools;
12use mysten_network::Multiaddr;
13use parking_lot::RwLock;
14use prometheus::Registry;
15use tracing::{info, warn};
16
17use crate::{
18 BlockAPI as _, CommitConsumerArgs,
19 authority_service::AuthorityService,
20 block_manager::BlockManager,
21 block_sync_service::BlockSyncService,
22 block_verifier::SignedBlockVerifier,
23 commit_observer::CommitObserver,
24 commit_syncer::{CommitSyncer, CommitSyncerHandle},
25 commit_vote_monitor::CommitVoteMonitor,
26 context::{Clock, Context},
27 core::{Core, CoreSignals},
28 core_thread::{ChannelCoreThreadDispatcher, CoreThreadHandle},
29 dag_state::DagState,
30 leader_schedule::LeaderSchedule,
31 leader_timeout::{LeaderTimeoutTask, LeaderTimeoutTaskHandle},
32 metrics::initialise_metrics,
33 network::{
34 CommitSyncerClient, NetworkManager, PeerId, SynchronizerClient, tonic_network::TonicManager,
35 },
36 observer_service::ObserverService,
37 observer_subscriber::ObserverSubscriber,
38 peers_pool::PeersPool,
39 round_prober::{RoundProber, RoundProberHandle},
40 round_tracker::RoundTracker,
41 storage::rocksdb_store::RocksDBStore,
42 subscriber::Subscriber,
43 synchronizer::{Synchronizer, SynchronizerHandle},
44 transaction::{TransactionClient, TransactionConsumer, TransactionVerifier},
45 transaction_vote_tracker::TransactionVoteTracker,
46};
47
48#[allow(private_interfaces)]
51pub enum ConsensusAuthority {
52 WithTonic(AuthorityNode<TonicManager>),
53}
54
55impl ConsensusAuthority {
56 pub async fn start(
57 network_type: NetworkType,
58 epoch_start_timestamp_ms: u64,
59 committee: Committee,
60 parameters: Parameters,
61 protocol_config: ConsensusProtocolConfig,
62 protocol_keypair: Option<ProtocolKeyPair>,
64 network_keypair: NetworkKeyPair,
65 clock: Arc<Clock>,
66 transaction_verifier: Arc<dyn TransactionVerifier>,
67 commit_consumer: CommitConsumerArgs,
68 registry: Registry,
69 boot_counter: u64,
73 ) -> Self {
74 match network_type {
75 NetworkType::Tonic => {
76 let authority = AuthorityNode::start(
77 epoch_start_timestamp_ms,
78 committee,
79 parameters,
80 protocol_config,
81 protocol_keypair,
82 network_keypair,
83 clock,
84 transaction_verifier,
85 commit_consumer,
86 registry,
87 boot_counter,
88 )
89 .await;
90 Self::WithTonic(authority)
91 }
92 }
93 }
94
95 pub async fn stop(self) {
96 match self {
97 Self::WithTonic(authority) => authority.stop().await,
98 }
99 }
100
101 pub fn update_peer_address(
102 &self,
103 network_pubkey: NetworkPublicKey,
104 address: Option<Multiaddr>,
105 ) {
106 match self {
107 Self::WithTonic(authority) => authority.update_peer_address(network_pubkey, address),
108 }
109 }
110
111 pub fn transaction_client(&self) -> Arc<TransactionClient> {
112 match self {
113 Self::WithTonic(authority) => authority.transaction_client(),
114 }
115 }
116
117 #[cfg(test)]
118 fn context(&self) -> &Arc<Context> {
119 match self {
120 Self::WithTonic(authority) => &authority.context,
121 }
122 }
123}
124
125#[derive(Clone, Copy, PartialEq, Eq, Debug)]
126pub enum NetworkType {
127 Tonic,
128}
129
130enum SubscriberType<N: NetworkManager> {
132 Validator(Subscriber<N::ValidatorClient, AuthorityService<ChannelCoreThreadDispatcher>>),
133 Observer(ObserverSubscriber<N::ObserverClient, ObserverService>),
134}
135
136impl<N: NetworkManager> SubscriberType<N> {
137 fn stop(&self) {
138 match self {
139 SubscriberType::Validator(subscriber) => subscriber.stop(),
140 SubscriberType::Observer(subscriber) => subscriber.stop(),
141 }
142 }
143}
144
145pub(crate) struct AuthorityNode<N>
146where
147 N: NetworkManager,
148{
149 context: Arc<Context>,
150 start_time: Instant,
151 transaction_client: Arc<TransactionClient>,
152 synchronizer: Arc<SynchronizerHandle>,
153
154 commit_syncer_handle: CommitSyncerHandle,
155 round_prober_handle: Option<RoundProberHandle>,
156 leader_timeout_handle: LeaderTimeoutTaskHandle,
157 core_thread_handle: CoreThreadHandle,
158 subscriber: SubscriberType<N>,
159 network_manager: N,
160}
161
162impl<N> AuthorityNode<N>
163where
164 N: NetworkManager,
165{
166 pub(crate) async fn start(
168 epoch_start_timestamp_ms: u64,
169 committee: Committee,
170 parameters: Parameters,
171 protocol_config: ConsensusProtocolConfig,
172 protocol_keypair: Option<ProtocolKeyPair>,
173 network_keypair: NetworkKeyPair,
174 clock: Arc<Clock>,
175 transaction_verifier: Arc<dyn TransactionVerifier>,
176 commit_consumer: CommitConsumerArgs,
177 registry: Registry,
178 boot_counter: u64,
179 ) -> Self {
180 let metrics = initialise_metrics(registry);
181
182 let own_index = if let Some(protocol_keypair) = &protocol_keypair {
184 let (own_index, _) = committee
185 .authorities()
186 .find(|(_, a)| a.protocol_key == protocol_keypair.public())
187 .expect("Own authority should be among the consensus authorities!");
188
189 let own_hostname = committee.authority(own_index).hostname.clone();
190 info!(
191 "Starting consensus validator authority {} {}, {:?}, epoch start timestamp {}, boot counter {}, replaying after commit index {}, consumer last processed commit index {}",
192 own_index,
193 own_hostname,
194 protocol_config.protocol_version(),
195 epoch_start_timestamp_ms,
196 boot_counter,
197 commit_consumer.replay_after_commit_index,
198 commit_consumer.consumer_last_processed_commit_index
199 );
200
201 metrics
202 .node_metrics
203 .authority_index
204 .with_label_values(&[&own_hostname])
205 .set(own_index.value() as i64);
206 Some(own_index)
207 } else {
208 info!(
210 "Starting consensus observer authority, {:?}, epoch start timestamp {}, boot counter {}, replaying after commit index {}, consumer last processed commit index {}",
211 protocol_config.protocol_version(),
212 epoch_start_timestamp_ms,
213 boot_counter,
214 commit_consumer.replay_after_commit_index,
215 commit_consumer.consumer_last_processed_commit_index
216 );
217 None
218 };
219
220 info!(
221 "Consensus authorities: {}",
222 committee
223 .authorities()
224 .map(|(i, a)| format!("{}: {}", i, a.hostname))
225 .join(", ")
226 );
227 info!("Consensus parameters: {:?}", parameters);
228 info!("Consensus committee: {:?}", committee);
229 let context = Arc::new(Context::new(
230 epoch_start_timestamp_ms,
231 own_index,
232 committee,
233 parameters,
234 protocol_config,
235 metrics,
236 clock,
237 ));
238 let start_time = Instant::now();
239
240 context
241 .metrics
242 .node_metrics
243 .protocol_version
244 .set(context.protocol_config.protocol_version() as i64);
245
246 let (tx_client, tx_receiver) = TransactionClient::new(context.clone());
247 let tx_consumer = TransactionConsumer::new(tx_receiver, context.clone());
248
249 let (core_signals, signals_receivers) = CoreSignals::new(context.clone());
250
251 let mut network_manager = N::new(context.clone(), network_keypair);
252 let validator_client = network_manager.validator_client();
253 let observer_client = network_manager.observer_client();
254
255 let synchronizer_client = Arc::new(SynchronizerClient::<
256 N::ValidatorClient,
257 N::ObserverClient,
258 >::new(
259 context.clone(),
260 Some(validator_client.clone()),
261 Some(observer_client.clone()),
262 ));
263 let commit_syncer_client = Arc::new(CommitSyncerClient::<
264 N::ValidatorClient,
265 N::ObserverClient,
266 >::new(
267 context.clone(),
268 Some(validator_client.clone()),
269 Some(observer_client.clone()),
270 ));
271
272 let store_path = context.parameters.db_path.as_path().to_str().unwrap();
273 let store = Arc::new(RocksDBStore::new(
274 store_path,
275 context.parameters.use_fifo_compaction,
276 ));
277 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
278
279 let block_verifier = Arc::new(SignedBlockVerifier::new(
280 context.clone(),
281 transaction_verifier,
282 ));
283
284 let transaction_vote_tracker =
285 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
286
287 let sync_last_known_own_block = boot_counter == 0
289 && !context
290 .parameters
291 .sync_last_known_own_block_timeout
292 .is_zero()
293 && context.is_validator();
294 info!(
295 "Sync last known own block: {}. Boot count: {}. Timeout: {:?}.",
296 sync_last_known_own_block,
297 boot_counter,
298 context.parameters.sync_last_known_own_block_timeout
299 );
300
301 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
302
303 let leader_schedule = Arc::new(LeaderSchedule::from_store(
304 context.clone(),
305 dag_state.clone(),
306 ));
307
308 let commit_consumer_monitor = commit_consumer.monitor();
309 let commit_observer = CommitObserver::new(
310 context.clone(),
311 commit_consumer,
312 dag_state.clone(),
313 transaction_vote_tracker.clone(),
314 )
315 .await;
316
317 let initial_received_rounds = dag_state
318 .read()
319 .get_last_cached_block_per_authority(Round::MAX)
320 .into_iter()
321 .map(|(block, _)| block.round())
322 .collect::<Vec<_>>();
323 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(
324 context.clone(),
325 initial_received_rounds,
326 )));
327
328 let core = if context.is_validator() {
331 Core::new_validator(
332 context.clone(),
333 leader_schedule,
334 tx_consumer,
335 transaction_vote_tracker.clone(),
336 block_manager,
337 commit_observer,
338 core_signals,
339 protocol_keypair.expect("protocol keypair is required when running as validator"),
340 dag_state.clone(),
341 sync_last_known_own_block,
342 round_tracker.clone(),
343 )
344 } else {
345 Core::new_observer(
346 context.clone(),
347 leader_schedule,
348 block_manager,
349 commit_observer,
350 core_signals,
351 dag_state.clone(),
352 )
353 };
354
355 let (core_dispatcher, core_thread_handle) =
356 ChannelCoreThreadDispatcher::start(context.clone(), &dag_state, core);
357 let core_dispatcher = Arc::new(core_dispatcher);
358 let leader_timeout_handle =
359 LeaderTimeoutTask::start(core_dispatcher.clone(), &signals_receivers, context.clone());
360
361 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
362
363 let peers_pool = Arc::new(PeersPool::new(context.clone()));
365
366 let synchronizer = Synchronizer::start(
367 synchronizer_client.clone(),
368 context.clone(),
369 core_dispatcher.clone(),
370 commit_vote_monitor.clone(),
371 block_verifier.clone(),
372 transaction_vote_tracker.clone(),
373 round_tracker.clone(),
374 dag_state.clone(),
375 peers_pool.clone(),
376 sync_last_known_own_block,
377 );
378
379 let commit_syncer_handle = CommitSyncer::new(
380 context.clone(),
381 core_dispatcher.clone(),
382 commit_vote_monitor.clone(),
383 commit_consumer_monitor.clone(),
384 block_verifier.clone(),
385 transaction_vote_tracker.clone(),
386 round_tracker.clone(),
387 commit_syncer_client.clone(),
388 dag_state.clone(),
389 peers_pool.clone(),
390 )
391 .start();
392
393 let block_sync_service = Arc::new(BlockSyncService::new(
395 context.clone(),
396 dag_state.clone(),
397 store.clone(),
398 ));
399
400 let (subscriber, round_prober_handle) = if context.is_validator() {
401 let authority_service = Arc::new(AuthorityService::new(
402 context.clone(),
403 block_verifier.clone(),
404 commit_vote_monitor.clone(),
405 round_tracker.clone(),
406 synchronizer.clone(),
407 core_dispatcher.clone(),
408 signals_receivers.block_broadcast_receiver(),
409 transaction_vote_tracker.clone(),
410 dag_state.clone(),
411 block_sync_service.clone(),
412 ));
413
414 network_manager
416 .start_validator_server(authority_service.clone())
417 .await;
418
419 let s = Subscriber::new(
421 context.clone(),
422 validator_client.clone(),
423 authority_service.clone(),
424 dag_state.clone(),
425 );
426 for (peer, _) in context.committee.authorities() {
427 if peer != context.own_index {
428 s.subscribe(peer);
429 }
430 }
431
432 let round_prober_handle = Some(
434 RoundProber::new(
435 context.clone(),
436 core_dispatcher.clone(),
437 round_tracker.clone(),
438 dag_state.clone(),
439 validator_client,
440 )
441 .start(),
442 );
443
444 if context.parameters.observer.is_server_enabled() {
446 let observer_service = Arc::new(ObserverService::new(
447 context.clone(),
448 core_dispatcher.clone(),
449 dag_state.clone(),
450 signals_receivers.accepted_block_broadcast_receiver(),
451 block_verifier,
452 commit_vote_monitor.clone(),
453 transaction_vote_tracker.clone(),
454 synchronizer.clone(),
455 block_sync_service.clone(),
456 ));
457 network_manager
458 .start_observer_server(observer_service)
459 .await;
460 }
461
462 (SubscriberType::Validator(s), round_prober_handle)
463 } else {
464 let observer_client = network_manager.observer_client();
466 let observer_service = Arc::new(ObserverService::new(
467 context.clone(),
468 core_dispatcher.clone(),
469 dag_state.clone(),
470 signals_receivers.accepted_block_broadcast_receiver(),
471 block_verifier,
472 commit_vote_monitor.clone(),
473 transaction_vote_tracker.clone(),
474 synchronizer.clone(),
475 block_sync_service.clone(),
476 ));
477
478 let observer_subscriber = ObserverSubscriber::new(
479 context.clone(),
480 observer_client,
481 observer_service.clone(),
482 dag_state.clone(),
483 );
484
485 network_manager
486 .start_observer_server(observer_service)
487 .await;
488
489 for peer_record in context.parameters.observer.peers.iter().take(1) {
493 let peer_id = if let Some((index, _)) = context
494 .committee
495 .authorities()
496 .find(|(_, authority)| authority.network_key == peer_record.public_key)
497 {
498 PeerId::Validator(index)
499 } else {
500 PeerId::Observer(Box::new(peer_record.public_key.clone()))
501 };
502
503 info!("Observer subscribing to peer: {:?}", peer_id);
504 observer_subscriber.subscribe(peer_id);
505 }
506
507 (SubscriberType::Observer(observer_subscriber), None)
508 };
509
510 info!(
511 "Consensus authority started, took {:?}",
512 start_time.elapsed()
513 );
514
515 Self {
516 context,
517 start_time,
518 transaction_client: Arc::new(tx_client),
519 synchronizer,
520 commit_syncer_handle,
521 round_prober_handle,
522 leader_timeout_handle,
523 core_thread_handle,
524 subscriber,
525 network_manager,
526 }
527 }
528
529 pub(crate) async fn stop(mut self) {
530 info!(
531 "Stopping authority. Total run time: {:?}",
532 self.start_time.elapsed()
533 );
534
535 if let Err(e) = self.synchronizer.stop().await {
537 if e.is_panic() {
538 std::panic::resume_unwind(e.into_panic());
539 }
540 warn!(
541 "Failed to stop synchronizer when shutting down consensus: {:?}",
542 e
543 );
544 };
545 self.commit_syncer_handle.stop().await;
546 if let Some(round_prober_handle) = self.round_prober_handle {
547 round_prober_handle.stop().await;
548 }
549 self.leader_timeout_handle.stop().await;
550 self.core_thread_handle.stop().await;
552 self.subscriber.stop();
554 self.network_manager.stop().await;
555
556 self.context
557 .metrics
558 .node_metrics
559 .uptime
560 .observe(self.start_time.elapsed().as_secs_f64());
561 }
562
563 pub(crate) fn transaction_client(&self) -> Arc<TransactionClient> {
564 self.transaction_client.clone()
565 }
566
567 pub(crate) fn update_peer_address(
568 &self,
569 network_pubkey: NetworkPublicKey,
570 address: Option<Multiaddr>,
571 ) {
572 let Some(peer) = self
574 .context
575 .committee
576 .authorities()
577 .find(|(_, authority)| authority.network_key == network_pubkey)
578 .map(|(index, _)| index)
579 else {
580 warn!(
581 "Network public key {:?} not found in committee, ignoring address update",
582 network_pubkey
583 );
584 return;
585 };
586
587 self.network_manager.update_peer_address(peer, address);
589
590 if peer != self.context.own_index {
592 info!("Re-subscribing to peer {} after address update", peer);
593 match &self.subscriber {
594 SubscriberType::Validator(s) => s.subscribe(peer),
595 SubscriberType::Observer(s) => {
596 s.subscribe(PeerId::Validator(peer));
598 }
599 }
600 }
601 }
602}
603
604#[cfg(test)]
605mod tests {
606 #![allow(non_snake_case)]
607
608 use std::{
609 collections::{BTreeMap, BTreeSet},
610 sync::Arc,
611 time::Duration,
612 };
613
614 use consensus_config::{
615 AuthorityIndex, ObserverParameters, Parameters, PeerRecord, local_committee_and_keys,
616 };
617 use mysten_metrics::RegistryService;
618 use mysten_metrics::monitored_mpsc::UnboundedReceiver;
619 use prometheus::Registry;
620 use rand::{SeedableRng, rngs::StdRng};
621 use rstest::rstest;
622 use tempfile::TempDir;
623 use tokio::time::{sleep, timeout};
624 use typed_store::DBMetrics;
625
626 use super::*;
627 use crate::{
628 CommittedSubDag,
629 block::{BlockAPI as _, GENESIS_ROUND},
630 transaction::NoopTransactionVerifier,
631 };
632
633 #[rstest]
634 #[tokio::test]
635 async fn test_authority_start_and_stop(
636 #[values(NetworkType::Tonic)] network_type: NetworkType,
637 ) {
638 let (committee, keypairs) = local_committee_and_keys(0, vec![1]);
639 let registry = Registry::new();
640
641 let temp_dir = TempDir::new().unwrap();
642 let parameters = Parameters {
643 db_path: temp_dir.keep(),
644 ..Default::default()
645 };
646 let txn_verifier = NoopTransactionVerifier {};
647
648 let own_index = committee.to_authority_index(0).unwrap();
649 let protocol_keypair = keypairs[own_index].1.clone();
650 let network_keypair = keypairs[own_index].0.clone();
651
652 let (commit_consumer, _) = CommitConsumerArgs::new(0, 0);
653
654 let authority = ConsensusAuthority::start(
655 network_type,
656 0,
657 committee,
658 parameters,
659 ConsensusProtocolConfig::for_testing(),
660 Some(protocol_keypair),
661 network_keypair,
662 Arc::new(Clock::default()),
663 Arc::new(txn_verifier),
664 commit_consumer,
665 registry,
666 0,
667 )
668 .await;
669
670 assert_eq!(authority.context().own_index, own_index);
671 assert_eq!(authority.context().committee.epoch(), 0);
672 assert_eq!(authority.context().committee.size(), 1);
673
674 authority.stop().await;
675 }
676
677 #[rstest]
678 #[tokio::test]
679 async fn test_observer_start_and_stop(#[values(NetworkType::Tonic)] network_type: NetworkType) {
680 let (committee, keypairs) = local_committee_and_keys(0, vec![1]);
681 let registry = Registry::new();
682
683 let temp_dir = TempDir::new().unwrap();
684 let parameters = Parameters {
685 db_path: temp_dir.keep(),
686 ..Default::default()
687 };
688 let txn_verifier = NoopTransactionVerifier {};
689
690 let network_keypair = keypairs[0].0.clone();
692
693 let (commit_consumer, _) = CommitConsumerArgs::new(0, 0);
694
695 let observer = ConsensusAuthority::start(
696 network_type,
697 0,
698 committee.clone(),
699 parameters,
700 ConsensusProtocolConfig::for_testing(),
701 None, network_keypair,
703 Arc::new(Clock::default()),
704 Arc::new(txn_verifier),
705 commit_consumer,
706 registry,
707 0,
708 )
709 .await;
710
711 sleep(Duration::from_secs(2)).await;
712
713 assert_eq!(observer.context().own_index, AuthorityIndex::MAX);
715 assert_eq!(observer.context().committee.epoch(), 0);
716 assert_eq!(observer.context().committee.size(), 1);
717 assert!(!observer.context().is_validator());
718
719 observer.stop().await;
720 }
721
722 #[rstest]
727 #[tokio::test(flavor = "current_thread")]
728 async fn test_authority_committee(
729 #[values(NetworkType::Tonic)] network_type: NetworkType,
730 #[values(5, 10)] gc_depth: u32,
731 ) {
732 telemetry_subscribers::init_for_testing();
733 let db_registry = Registry::new();
734 DBMetrics::init(RegistryService::new(db_registry));
735
736 const NUM_OF_AUTHORITIES: usize = 4;
737 let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
738 let mut protocol_config = ConsensusProtocolConfig::for_testing();
739 protocol_config.set_gc_depth_for_testing(gc_depth);
740
741 let temp_dirs = (0..NUM_OF_AUTHORITIES)
742 .map(|_| TempDir::new().unwrap())
743 .collect::<Vec<_>>();
744
745 let mut commit_receivers = Vec::with_capacity(committee.size());
746 let mut authorities = Vec::with_capacity(committee.size());
747 let mut boot_counters = [0; NUM_OF_AUTHORITIES];
748
749 let observer_server_port = 8900 + gc_depth as u16;
751
752 let mut authority_0_network_key = None;
754 for (index, authority_info) in committee.authorities() {
755 let (authority, commit_receiver) = if index.value() == 0 {
756 authority_0_network_key = Some(authority_info.network_key.clone());
758 make_authority_with_observer_server(
760 index,
761 &temp_dirs[index.value()],
762 committee.clone(),
763 keypairs.clone(),
764 network_type,
765 boot_counters[index],
766 protocol_config.clone(),
767 Some(observer_server_port),
768 )
769 .await
770 } else {
771 make_authority(
772 index,
773 &temp_dirs[index.value()],
774 committee.clone(),
775 keypairs.clone(),
776 network_type,
777 boot_counters[index],
778 protocol_config.clone(),
779 )
780 .await
781 };
782 boot_counters[index] += 1;
783 commit_receivers.push(commit_receiver);
784 authorities.push(authority);
785 }
786
787 let observer_temp_dir = TempDir::new().unwrap();
789 let mut rng = StdRng::from_seed([99; 32]);
790 let observer_network_keypair = consensus_config::NetworkKeyPair::generate(&mut rng);
791
792 let observer_parameters = Parameters {
793 db_path: observer_temp_dir.path().to_path_buf(),
794 observer: ObserverParameters {
795 peers: vec![PeerRecord {
797 public_key: authority_0_network_key
798 .clone()
799 .expect("Authority 0 network key should be set"),
800 address: format!("/ip4/127.0.0.1/udp/{}", observer_server_port)
801 .parse()
802 .unwrap(),
803 }],
804 ..Default::default()
805 },
806 ..Default::default()
807 };
808
809 let (observer_commit_consumer, observer_commit_receiver) = CommitConsumerArgs::new(0, 0);
810 let observer = ConsensusAuthority::start(
811 network_type,
812 0,
813 committee.clone(),
814 observer_parameters,
815 protocol_config.clone(),
816 None, observer_network_keypair,
818 Arc::new(Clock::default()),
819 Arc::new(NoopTransactionVerifier {}),
820 observer_commit_consumer,
821 Registry::new(),
822 0,
823 )
824 .await;
825 commit_receivers.push(observer_commit_receiver);
828
829 sleep(Duration::from_secs(5)).await;
831
832 const NUM_TRANSACTIONS: u8 = 15;
833 let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
834 for i in 0..NUM_TRANSACTIONS {
835 let txn = vec![i; 16];
836 submitted_transactions.insert(txn.clone());
837 authorities[i as usize % authorities.len()]
838 .transaction_client()
839 .submit(vec![txn])
840 .await
841 .unwrap();
842 }
843
844 for receiver in &mut commit_receivers {
845 let mut expected_transactions = submitted_transactions.clone();
846 loop {
847 let committed_subdag =
848 tokio::time::timeout(Duration::from_secs(1), receiver.recv())
849 .await
850 .unwrap()
851 .unwrap();
852 for b in committed_subdag.blocks {
853 for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
854 assert!(
855 expected_transactions.remove(&txn),
856 "Transaction not submitted or already seen: {:?}",
857 txn
858 );
859 }
860 }
861 if expected_transactions.is_empty() {
862 break;
863 }
864 }
865 }
866
867 let index = committee.to_authority_index(1).unwrap();
869 authorities.remove(index.value()).stop().await;
870 sleep(Duration::from_secs(10)).await;
871
872 let (authority, commit_receiver) = make_authority(
874 index,
875 &temp_dirs[index.value()],
876 committee.clone(),
877 keypairs.clone(),
878 network_type,
879 boot_counters[index],
880 protocol_config.clone(),
881 )
882 .await;
883 boot_counters[index] += 1;
884 commit_receivers[index] = commit_receiver;
885 authorities.insert(index.value(), authority);
886 sleep(Duration::from_secs(10)).await;
887
888 let observer_context = observer.context();
893 assert!(
894 observer_context.is_observer(),
895 "It should be an observer node"
896 );
897
898 let authority_0 = &authorities[0];
900 let authority_0_context = authority_0.context();
901 let mut authority_0_total_verified_blocks = 0;
902
903 for (_, authority_info) in committee.authorities() {
905 if let Ok(metric) = authority_0_context
906 .metrics
907 .node_metrics
908 .verified_blocks
909 .get_metric_with_label_values(&[&authority_info.hostname])
910 {
911 authority_0_total_verified_blocks += metric.get();
912 println!(
913 "authority_info.hostname: {}, metric: {:?}",
914 authority_info.hostname, authority_0_total_verified_blocks
915 );
916 }
917 }
918
919 let mut authority_0_total_proposed_blocks = 0;
920 for force in [true, false] {
921 if let Ok(metric) = authority_0_context
922 .metrics
923 .node_metrics
924 .proposed_blocks
925 .get_metric_with_label_values(&[&force.to_string()])
926 {
927 authority_0_total_proposed_blocks += metric.get();
928 }
929 }
930
931 authority_0_total_verified_blocks += authority_0_total_proposed_blocks;
932
933 let mut observer_received_blocks = 0;
935 for (_, authority_info) in committee.authorities() {
936 if let Ok(metric) = observer_context
937 .metrics
938 .node_metrics
939 .verified_blocks
940 .get_metric_with_label_values(&[&authority_info.hostname])
941 {
942 observer_received_blocks += metric.get();
943 }
944 }
945
946 assert!(
949 observer_received_blocks > 0,
950 "Observer should have received at least some blocks, got: {}",
951 observer_received_blocks
952 );
953
954 println!(
955 "authority_0_total_verified_blocks: {}, observer_received_blocks: {}",
956 authority_0_total_verified_blocks, observer_received_blocks
957 );
958
959 const TOLERANCE: u64 = 20;
960 assert!(
961 authority_0_total_verified_blocks - observer_received_blocks <= TOLERANCE,
962 "The number of blocks received by the observer ({}) should be close to the number of blocks verified by authority 0 ({})",
963 observer_received_blocks,
964 authority_0_total_verified_blocks,
965 );
966
967 observer.stop().await;
969
970 for authority in authorities {
972 authority.stop().await;
973 }
974 }
975
976 #[rstest]
977 #[tokio::test(flavor = "current_thread")]
978 async fn test_small_committee(
979 #[values(NetworkType::Tonic)] network_type: NetworkType,
980 #[values(1, 2, 3)] num_authorities: usize,
981 ) {
982 telemetry_subscribers::init_for_testing();
983 let db_registry = Registry::new();
984 DBMetrics::init(RegistryService::new(db_registry));
985
986 let (committee, keypairs) = local_committee_and_keys(0, vec![1; num_authorities]);
987 let protocol_config = ConsensusProtocolConfig::for_testing();
988
989 let temp_dirs = (0..num_authorities)
990 .map(|_| TempDir::new().unwrap())
991 .collect::<Vec<_>>();
992
993 let mut output_receivers = Vec::with_capacity(committee.size());
994 let mut authorities: Vec<ConsensusAuthority> = Vec::with_capacity(committee.size());
995 let mut boot_counters = vec![0; num_authorities];
996
997 for (index, _authority_info) in committee.authorities() {
998 let (authority, commit_receiver) = make_authority(
999 index,
1000 &temp_dirs[index.value()],
1001 committee.clone(),
1002 keypairs.clone(),
1003 network_type,
1004 boot_counters[index],
1005 protocol_config.clone(),
1006 )
1007 .await;
1008 boot_counters[index] += 1;
1009 output_receivers.push(commit_receiver);
1010 authorities.push(authority);
1011 }
1012
1013 const NUM_TRANSACTIONS: u8 = 15;
1014 let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
1015 for i in 0..NUM_TRANSACTIONS {
1016 let txn = vec![i; 16];
1017 submitted_transactions.insert(txn.clone());
1018 authorities[i as usize % authorities.len()]
1019 .transaction_client()
1020 .submit(vec![txn])
1021 .await
1022 .unwrap();
1023 }
1024
1025 for receiver in &mut output_receivers {
1026 let mut expected_transactions = submitted_transactions.clone();
1027 loop {
1028 let committed_subdag =
1029 tokio::time::timeout(Duration::from_secs(1), receiver.recv())
1030 .await
1031 .unwrap()
1032 .unwrap();
1033 for b in committed_subdag.blocks {
1034 for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
1035 assert!(
1036 expected_transactions.remove(&txn),
1037 "Transaction not submitted or already seen: {:?}",
1038 txn
1039 );
1040 }
1041 }
1042 if expected_transactions.is_empty() {
1043 break;
1044 }
1045 }
1046 }
1047
1048 let index = committee.to_authority_index(0).unwrap();
1050 authorities.remove(index.value()).stop().await;
1051 sleep(Duration::from_secs(10)).await;
1052
1053 let (authority, commit_receiver) = make_authority(
1055 index,
1056 &temp_dirs[index.value()],
1057 committee.clone(),
1058 keypairs.clone(),
1059 network_type,
1060 boot_counters[index],
1061 protocol_config.clone(),
1062 )
1063 .await;
1064 boot_counters[index] += 1;
1065 output_receivers[index] = commit_receiver;
1066 authorities.insert(index.value(), authority);
1067 sleep(Duration::from_secs(10)).await;
1068
1069 for authority in authorities {
1071 authority.stop().await;
1072 }
1073 }
1074
1075 #[rstest]
1076 #[tokio::test(flavor = "current_thread")]
1077 async fn test_amnesia_recovery_success(#[values(5, 10)] gc_depth: u32) {
1078 telemetry_subscribers::init_for_testing();
1079 let db_registry = Registry::new();
1080 DBMetrics::init(RegistryService::new(db_registry));
1081
1082 const NUM_OF_AUTHORITIES: usize = 4;
1083 let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
1084 let mut commit_receivers = vec![];
1085 let mut authorities = BTreeMap::new();
1086 let mut temp_dirs = BTreeMap::new();
1087 let mut boot_counters = [0; NUM_OF_AUTHORITIES];
1088
1089 let mut protocol_config = ConsensusProtocolConfig::for_testing();
1090 protocol_config.set_gc_depth_for_testing(gc_depth);
1091
1092 for (index, _authority_info) in committee.authorities() {
1093 let dir = TempDir::new().unwrap();
1094 let (authority, commit_receiver) = make_authority(
1095 index,
1096 &dir,
1097 committee.clone(),
1098 keypairs.clone(),
1099 NetworkType::Tonic,
1100 boot_counters[index],
1101 protocol_config.clone(),
1102 )
1103 .await;
1104 boot_counters[index] += 1;
1105 commit_receivers.push(commit_receiver);
1106 authorities.insert(index, authority);
1107 temp_dirs.insert(index, dir);
1108 }
1109
1110 let index_1 = committee.to_authority_index(1).unwrap();
1114 'outer: while let Some(result) =
1115 timeout(Duration::from_secs(10), commit_receivers[index_1].recv())
1116 .await
1117 .expect("Timed out while waiting for at least one committed block from authority 1")
1118 {
1119 for block in result.blocks {
1120 if block.round() > GENESIS_ROUND && block.author() == index_1 {
1121 break 'outer;
1122 }
1123 }
1124 }
1125
1126 authorities.remove(&index_1).unwrap().stop().await;
1132 let index_2 = committee.to_authority_index(2).unwrap();
1133 authorities.remove(&index_2).unwrap().stop().await;
1134 sleep(Duration::from_secs(5)).await;
1135
1136 let dir = TempDir::new().unwrap();
1140 boot_counters[index_1] = 0;
1142 let (authority, mut commit_receiver) = make_authority(
1143 index_1,
1144 &dir,
1145 committee.clone(),
1146 keypairs.clone(),
1147 NetworkType::Tonic,
1148 boot_counters[index_1],
1149 protocol_config.clone(),
1150 )
1151 .await;
1152 boot_counters[index_1] += 1;
1153 authorities.insert(index_1, authority);
1154 temp_dirs.insert(index_1, dir);
1155 sleep(Duration::from_secs(5)).await;
1156
1157 let (authority, _commit_receiver) = make_authority(
1160 index_2,
1161 &temp_dirs[&index_2],
1162 committee.clone(),
1163 keypairs,
1164 NetworkType::Tonic,
1165 boot_counters[index_2],
1166 protocol_config.clone(),
1167 )
1168 .await;
1169 boot_counters[index_2] += 1;
1170 authorities.insert(index_2, authority);
1171 sleep(Duration::from_secs(5)).await;
1172
1173 'outer: while let Some(result) = commit_receiver.recv().await {
1175 for block in result.blocks {
1176 if block.round() > GENESIS_ROUND && block.author() == index_1 {
1177 break 'outer;
1178 }
1179 }
1180 }
1181
1182 for (_, authority) in authorities {
1184 authority.stop().await;
1185 }
1186 }
1187
1188 async fn make_authority(
1190 index: AuthorityIndex,
1191 db_dir: &TempDir,
1192 committee: Committee,
1193 keypairs: Vec<(NetworkKeyPair, ProtocolKeyPair)>,
1194 network_type: NetworkType,
1195 boot_counter: u64,
1196 protocol_config: ConsensusProtocolConfig,
1197 ) -> (ConsensusAuthority, UnboundedReceiver<CommittedSubDag>) {
1198 make_authority_with_observer_server(
1199 index,
1200 db_dir,
1201 committee,
1202 keypairs,
1203 network_type,
1204 boot_counter,
1205 protocol_config,
1206 None, )
1208 .await
1209 }
1210
1211 async fn make_authority_with_observer_server(
1212 index: AuthorityIndex,
1213 db_dir: &TempDir,
1214 committee: Committee,
1215 keypairs: Vec<(NetworkKeyPair, ProtocolKeyPair)>,
1216 network_type: NetworkType,
1217 boot_counter: u64,
1218 protocol_config: ConsensusProtocolConfig,
1219 observer_server_port: Option<u16>,
1220 ) -> (ConsensusAuthority, UnboundedReceiver<CommittedSubDag>) {
1221 let registry = Registry::new();
1222
1223 let mut parameters = Parameters {
1225 db_path: db_dir.path().to_path_buf(),
1226 dag_state_cached_rounds: 5,
1227 commit_sync_parallel_fetches: 2,
1228 commit_sync_batch_size: 3,
1229 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
1230 ..Default::default()
1231 };
1232
1233 if let Some(port) = observer_server_port {
1235 parameters.observer.server_port = Some(port);
1236 }
1237
1238 let txn_verifier = NoopTransactionVerifier {};
1239
1240 let protocol_keypair = keypairs[index].1.clone();
1241 let network_keypair = keypairs[index].0.clone();
1242
1243 let (commit_consumer, commit_receiver) = CommitConsumerArgs::new(0, 0);
1244
1245 let authority = ConsensusAuthority::start(
1246 network_type,
1247 0,
1248 committee,
1249 parameters,
1250 protocol_config,
1251 Some(protocol_keypair),
1252 network_keypair,
1253 Arc::new(Clock::default()),
1254 Arc::new(txn_verifier),
1255 commit_consumer,
1256 registry,
1257 boot_counter,
1258 )
1259 .await;
1260
1261 (authority, commit_receiver)
1262 }
1263}