1use std::{sync::Arc, time::Instant};
5
6use consensus_config::{
7 Committee, ConsensusProtocolConfig, NetworkKeyPair, NetworkPublicKey, Parameters,
8 ProtocolKeyPair,
9};
10use consensus_types::block::Round;
11use itertools::Itertools;
12use mysten_network::Multiaddr;
13use parking_lot::RwLock;
14use prometheus::Registry;
15use tracing::{info, warn};
16
17use crate::{
18 BlockAPI as _, CommitConsumerArgs, RandomnessSignatureHandler,
19 authority_service::AuthorityService,
20 block_manager::BlockManager,
21 block_sync_service::BlockSyncService,
22 block_verifier::SignedBlockVerifier,
23 commit_observer::CommitObserver,
24 commit_syncer::{CommitSyncer, CommitSyncerHandle},
25 commit_vote_monitor::CommitVoteMonitor,
26 context::{Clock, Context},
27 core::{Core, CoreSignals},
28 core_thread::{ChannelCoreThreadDispatcher, CoreThreadHandle},
29 dag_state::DagState,
30 leader_schedule::LeaderSchedule,
31 leader_timeout::{LeaderTimeoutTask, LeaderTimeoutTaskHandle},
32 metrics::initialise_metrics,
33 network::{
34 CommitSyncerClient, NetworkManager, PeerId, SynchronizerClient, tonic_network::TonicManager,
35 },
36 observer_service::ObserverService,
37 observer_subscriber::ObserverSubscriber,
38 peers_pool::PeersPool,
39 round_prober::{RoundProber, RoundProberHandle},
40 round_tracker::RoundTracker,
41 storage::rocksdb_store::RocksDBStore,
42 subscriber::Subscriber,
43 synchronizer::{Synchronizer, SynchronizerHandle},
44 transaction::{TransactionClient, TransactionConsumer, TransactionVerifier},
45 transaction_vote_tracker::TransactionVoteTracker,
46};
47
48#[allow(private_interfaces)]
51pub enum ConsensusAuthority {
52 WithTonic(AuthorityNode<TonicManager>),
53}
54
55impl ConsensusAuthority {
56 pub async fn start(
57 network_type: NetworkType,
58 epoch_start_timestamp_ms: u64,
59 committee: Committee,
60 parameters: Parameters,
61 protocol_config: ConsensusProtocolConfig,
62 protocol_keypair: Option<ProtocolKeyPair>,
64 network_keypair: NetworkKeyPair,
65 clock: Arc<Clock>,
66 transaction_verifier: Arc<dyn TransactionVerifier>,
67 commit_consumer: CommitConsumerArgs,
68 registry: Registry,
69 boot_counter: u64,
73 randomness_signature_handler: Option<Arc<dyn RandomnessSignatureHandler>>,
74 ) -> Self {
75 match network_type {
76 NetworkType::Tonic => {
77 let authority = AuthorityNode::start(
78 epoch_start_timestamp_ms,
79 committee,
80 parameters,
81 protocol_config,
82 protocol_keypair,
83 network_keypair,
84 clock,
85 transaction_verifier,
86 commit_consumer,
87 registry,
88 boot_counter,
89 randomness_signature_handler,
90 )
91 .await;
92 Self::WithTonic(authority)
93 }
94 }
95 }
96
97 pub async fn stop(self) {
98 match self {
99 Self::WithTonic(authority) => authority.stop().await,
100 }
101 }
102
103 pub fn update_peer_address(
104 &self,
105 network_pubkey: NetworkPublicKey,
106 address: Option<Multiaddr>,
107 ) {
108 match self {
109 Self::WithTonic(authority) => authority.update_peer_address(network_pubkey, address),
110 }
111 }
112
113 pub fn transaction_client(&self) -> Arc<TransactionClient> {
114 match self {
115 Self::WithTonic(authority) => authority.transaction_client(),
116 }
117 }
118
119 pub fn store(&self) -> Arc<RocksDBStore> {
120 match self {
121 Self::WithTonic(authority) => authority.store(),
122 }
123 }
124
125 #[cfg(test)]
126 fn context(&self) -> &Arc<Context> {
127 match self {
128 Self::WithTonic(authority) => &authority.context,
129 }
130 }
131}
132
133#[derive(Clone, Copy, PartialEq, Eq, Debug)]
134pub enum NetworkType {
135 Tonic,
136}
137
138enum SubscriberType<N: NetworkManager> {
140 Validator(Subscriber<N::ValidatorClient, AuthorityService<ChannelCoreThreadDispatcher>>),
141 Observer(ObserverSubscriber<N::ObserverClient, ObserverService>),
142}
143
144impl<N: NetworkManager> SubscriberType<N> {
145 fn stop(&self) {
146 match self {
147 SubscriberType::Validator(subscriber) => subscriber.stop(),
148 SubscriberType::Observer(subscriber) => subscriber.stop(),
149 }
150 }
151}
152
153pub(crate) struct AuthorityNode<N>
154where
155 N: NetworkManager,
156{
157 context: Arc<Context>,
158 start_time: Instant,
159 transaction_client: Arc<TransactionClient>,
160 synchronizer: Arc<SynchronizerHandle>,
161 store: Arc<RocksDBStore>,
162
163 commit_syncer_handle: CommitSyncerHandle,
164 round_prober_handle: Option<RoundProberHandle>,
165 leader_timeout_handle: LeaderTimeoutTaskHandle,
166 core_thread_handle: CoreThreadHandle,
167 subscriber: SubscriberType<N>,
168 network_manager: N,
169}
170
171impl<N> AuthorityNode<N>
172where
173 N: NetworkManager,
174{
175 pub(crate) async fn start(
177 epoch_start_timestamp_ms: u64,
178 committee: Committee,
179 parameters: Parameters,
180 protocol_config: ConsensusProtocolConfig,
181 protocol_keypair: Option<ProtocolKeyPair>,
182 network_keypair: NetworkKeyPair,
183 clock: Arc<Clock>,
184 transaction_verifier: Arc<dyn TransactionVerifier>,
185 commit_consumer: CommitConsumerArgs,
186 registry: Registry,
187 boot_counter: u64,
188 randomness_signature_handler: Option<Arc<dyn RandomnessSignatureHandler>>,
189 ) -> Self {
190 let metrics = initialise_metrics(registry);
191
192 let own_index = if let Some(protocol_keypair) = &protocol_keypair {
194 let (own_index, _) = committee
195 .authorities()
196 .find(|(_, a)| a.protocol_key == protocol_keypair.public())
197 .expect("Own authority should be among the consensus authorities!");
198
199 let own_hostname = committee.authority(own_index).hostname.clone();
200 info!(
201 "Starting consensus validator authority {} {}, {:?}, epoch start timestamp {}, boot counter {}, replaying after commit index {}, consumer last processed commit index {}",
202 own_index,
203 own_hostname,
204 protocol_config.protocol_version(),
205 epoch_start_timestamp_ms,
206 boot_counter,
207 commit_consumer.replay_after_commit_index,
208 commit_consumer.consumer_last_processed_commit_index
209 );
210
211 metrics
212 .node_metrics
213 .authority_index
214 .with_label_values(&[&own_hostname])
215 .set(own_index.value() as i64);
216 Some(own_index)
217 } else {
218 info!(
220 "Starting consensus observer authority, {:?}, epoch start timestamp {}, boot counter {}, replaying after commit index {}, consumer last processed commit index {}",
221 protocol_config.protocol_version(),
222 epoch_start_timestamp_ms,
223 boot_counter,
224 commit_consumer.replay_after_commit_index,
225 commit_consumer.consumer_last_processed_commit_index
226 );
227 None
228 };
229
230 info!(
231 "Consensus authorities: {}",
232 committee
233 .authorities()
234 .map(|(i, a)| format!("{}: {}", i, a.hostname))
235 .join(", ")
236 );
237 info!("Consensus parameters: {:?}", parameters);
238 info!("Consensus committee: {:?}", committee);
239 let context = Arc::new(Context::new(
240 epoch_start_timestamp_ms,
241 own_index,
242 committee,
243 parameters,
244 protocol_config,
245 metrics,
246 clock,
247 ));
248 let start_time = Instant::now();
249
250 context
251 .metrics
252 .node_metrics
253 .protocol_version
254 .set(context.protocol_config.protocol_version() as i64);
255
256 let (tx_client, tx_receiver) = TransactionClient::new(context.clone());
257 let tx_consumer = TransactionConsumer::new(tx_receiver, context.clone());
258
259 let (core_signals, signals_receivers) = CoreSignals::new(context.clone());
260
261 let mut network_manager = N::new(context.clone(), network_keypair);
262 let validator_client = network_manager.validator_client();
263 let observer_client = network_manager.observer_client();
264
265 let synchronizer_client = Arc::new(SynchronizerClient::<
266 N::ValidatorClient,
267 N::ObserverClient,
268 >::new(
269 context.clone(),
270 Some(validator_client.clone()),
271 Some(observer_client.clone()),
272 ));
273 let commit_syncer_client = Arc::new(CommitSyncerClient::<
274 N::ValidatorClient,
275 N::ObserverClient,
276 >::new(
277 context.clone(),
278 Some(validator_client.clone()),
279 Some(observer_client.clone()),
280 ));
281
282 let store_path = context.parameters.db_path.as_path().to_str().unwrap();
283 let store = Arc::new(RocksDBStore::new(store_path));
284 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
285
286 let block_verifier = Arc::new(SignedBlockVerifier::new(
287 context.clone(),
288 transaction_verifier,
289 ));
290
291 let transaction_vote_tracker =
292 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
293
294 let sync_last_known_own_block = boot_counter == 0
296 && !context
297 .parameters
298 .sync_last_known_own_block_timeout
299 .is_zero()
300 && context.is_validator();
301 info!(
302 "Sync last known own block: {}. Boot count: {}. Timeout: {:?}.",
303 sync_last_known_own_block,
304 boot_counter,
305 context.parameters.sync_last_known_own_block_timeout
306 );
307
308 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
309
310 let leader_schedule = Arc::new(LeaderSchedule::from_store(
311 context.clone(),
312 dag_state.clone(),
313 ));
314
315 let commit_consumer_monitor = commit_consumer.monitor();
316 let commit_observer = CommitObserver::new(
317 context.clone(),
318 commit_consumer,
319 dag_state.clone(),
320 transaction_vote_tracker.clone(),
321 )
322 .await;
323
324 let initial_received_rounds = dag_state
325 .read()
326 .get_last_cached_block_per_authority(Round::MAX)
327 .into_iter()
328 .map(|(block, _)| block.round())
329 .collect::<Vec<_>>();
330 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(
331 context.clone(),
332 initial_received_rounds,
333 )));
334
335 let core = if context.is_validator() {
338 Core::new_validator(
339 context.clone(),
340 leader_schedule,
341 tx_consumer,
342 transaction_vote_tracker.clone(),
343 block_manager,
344 commit_observer,
345 core_signals,
346 protocol_keypair.expect("protocol keypair is required when running as validator"),
347 dag_state.clone(),
348 sync_last_known_own_block,
349 round_tracker.clone(),
350 )
351 } else {
352 Core::new_observer(
353 context.clone(),
354 leader_schedule,
355 block_manager,
356 commit_observer,
357 core_signals,
358 dag_state.clone(),
359 )
360 };
361
362 let (core_dispatcher, core_thread_handle) =
363 ChannelCoreThreadDispatcher::start(context.clone(), &dag_state, core);
364 let core_dispatcher = Arc::new(core_dispatcher);
365 let leader_timeout_handle =
366 LeaderTimeoutTask::start(core_dispatcher.clone(), &signals_receivers, context.clone());
367
368 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
369
370 let peers_pool = Arc::new(PeersPool::new(context.clone()));
372
373 let synchronizer = Synchronizer::start(
374 synchronizer_client.clone(),
375 context.clone(),
376 core_dispatcher.clone(),
377 commit_vote_monitor.clone(),
378 block_verifier.clone(),
379 transaction_vote_tracker.clone(),
380 round_tracker.clone(),
381 dag_state.clone(),
382 peers_pool.clone(),
383 sync_last_known_own_block,
384 );
385
386 let commit_syncer_handle = CommitSyncer::new(
387 context.clone(),
388 core_dispatcher.clone(),
389 commit_vote_monitor.clone(),
390 commit_consumer_monitor.clone(),
391 block_verifier.clone(),
392 transaction_vote_tracker.clone(),
393 round_tracker.clone(),
394 commit_syncer_client.clone(),
395 dag_state.clone(),
396 peers_pool.clone(),
397 )
398 .start();
399
400 let block_sync_service = Arc::new(BlockSyncService::new(
402 context.clone(),
403 dag_state.clone(),
404 store.clone(),
405 ));
406
407 let (subscriber, round_prober_handle) = if context.is_validator() {
408 let authority_service = Arc::new(AuthorityService::new(
409 context.clone(),
410 block_verifier.clone(),
411 commit_vote_monitor.clone(),
412 round_tracker.clone(),
413 synchronizer.clone(),
414 core_dispatcher.clone(),
415 signals_receivers.block_broadcast_receiver(),
416 transaction_vote_tracker.clone(),
417 dag_state.clone(),
418 block_sync_service.clone(),
419 ));
420
421 network_manager
423 .start_validator_server(authority_service.clone())
424 .await;
425
426 let s = Subscriber::new(
428 context.clone(),
429 validator_client.clone(),
430 authority_service.clone(),
431 dag_state.clone(),
432 );
433 for (peer, _) in context.committee.authorities() {
434 if peer != context.own_index {
435 s.subscribe(peer);
436 }
437 }
438
439 let round_prober_handle = Some(
441 RoundProber::new(
442 context.clone(),
443 core_dispatcher.clone(),
444 round_tracker.clone(),
445 dag_state.clone(),
446 validator_client,
447 )
448 .start(),
449 );
450
451 if context.parameters.observer.is_server_enabled() {
453 let observer_service = Arc::new(ObserverService::new(
454 context.clone(),
455 core_dispatcher.clone(),
456 dag_state.clone(),
457 signals_receivers.accepted_block_broadcast_receiver(),
458 block_verifier,
459 commit_vote_monitor.clone(),
460 transaction_vote_tracker.clone(),
461 synchronizer.clone(),
462 block_sync_service.clone(),
463 randomness_signature_handler.clone(),
464 ));
465 network_manager
466 .start_observer_server(observer_service)
467 .await;
468 }
469
470 (SubscriberType::Validator(s), round_prober_handle)
471 } else {
472 let observer_client = network_manager.observer_client();
474 let observer_service = Arc::new(ObserverService::new(
475 context.clone(),
476 core_dispatcher.clone(),
477 dag_state.clone(),
478 signals_receivers.accepted_block_broadcast_receiver(),
479 block_verifier,
480 commit_vote_monitor.clone(),
481 transaction_vote_tracker.clone(),
482 synchronizer.clone(),
483 block_sync_service.clone(),
484 randomness_signature_handler.clone(),
485 ));
486
487 let observer_subscriber = ObserverSubscriber::new(
488 context.clone(),
489 observer_client,
490 observer_service.clone(),
491 commit_vote_monitor.clone(),
492 dag_state.clone(),
493 randomness_signature_handler,
494 );
495
496 network_manager
497 .start_observer_server(observer_service)
498 .await;
499
500 for peer_record in context.parameters.observer.peers.iter().take(1) {
504 let peer_id = if let Some((index, _)) = context
505 .committee
506 .authorities()
507 .find(|(_, authority)| authority.network_key == peer_record.public_key)
508 {
509 PeerId::Validator(index)
510 } else {
511 PeerId::Observer(Box::new(peer_record.public_key.clone()))
512 };
513
514 info!("Observer subscribing to peer: {:?}", peer_id);
515 observer_subscriber.subscribe(peer_id);
516 }
517
518 (SubscriberType::Observer(observer_subscriber), None)
519 };
520
521 info!(
522 "Consensus authority started, took {:?}",
523 start_time.elapsed()
524 );
525
526 Self {
527 context,
528 start_time,
529 transaction_client: Arc::new(tx_client),
530 synchronizer,
531 store,
532 commit_syncer_handle,
533 round_prober_handle,
534 leader_timeout_handle,
535 core_thread_handle,
536 subscriber,
537 network_manager,
538 }
539 }
540
541 pub(crate) async fn stop(mut self) {
542 info!(
543 "Stopping authority. Total run time: {:?}",
544 self.start_time.elapsed()
545 );
546
547 if let Err(e) = self.synchronizer.stop().await {
549 if e.is_panic() {
550 std::panic::resume_unwind(e.into_panic());
551 }
552 warn!(
553 "Failed to stop synchronizer when shutting down consensus: {:?}",
554 e
555 );
556 };
557 self.commit_syncer_handle.stop().await;
558 if let Some(round_prober_handle) = self.round_prober_handle {
559 round_prober_handle.stop().await;
560 }
561 self.leader_timeout_handle.stop().await;
562 self.core_thread_handle.stop().await;
564 self.subscriber.stop();
566 self.network_manager.stop().await;
567
568 self.context
569 .metrics
570 .node_metrics
571 .uptime
572 .observe(self.start_time.elapsed().as_secs_f64());
573 }
574
575 pub(crate) fn transaction_client(&self) -> Arc<TransactionClient> {
576 self.transaction_client.clone()
577 }
578
579 pub(crate) fn store(&self) -> Arc<RocksDBStore> {
580 self.store.clone()
581 }
582
583 pub(crate) fn update_peer_address(
584 &self,
585 network_pubkey: NetworkPublicKey,
586 address: Option<Multiaddr>,
587 ) {
588 let Some(peer) = self
590 .context
591 .committee
592 .authorities()
593 .find(|(_, authority)| authority.network_key == network_pubkey)
594 .map(|(index, _)| index)
595 else {
596 warn!(
597 "Network public key {:?} not found in committee, ignoring address update",
598 network_pubkey
599 );
600 return;
601 };
602
603 self.network_manager.update_peer_address(peer, address);
605
606 if peer != self.context.own_index {
608 info!("Re-subscribing to peer {} after address update", peer);
609 match &self.subscriber {
610 SubscriberType::Validator(s) => s.subscribe(peer),
611 SubscriberType::Observer(s) => {
612 s.subscribe(PeerId::Validator(peer));
614 }
615 }
616 }
617 }
618}
619
620#[cfg(test)]
621mod tests {
622 #![allow(non_snake_case)]
623
624 use std::{
625 collections::{BTreeMap, BTreeSet},
626 sync::Arc,
627 time::Duration,
628 };
629
630 use consensus_config::{
631 AuthorityIndex, ObserverParameters, Parameters, PeerRecord, local_committee_and_keys,
632 };
633 use mysten_metrics::RegistryService;
634 use mysten_metrics::monitored_mpsc::UnboundedReceiver;
635 use prometheus::Registry;
636 use rand::{SeedableRng, rngs::StdRng};
637 use rstest::rstest;
638 use tempfile::TempDir;
639 use tokio::time::{sleep, timeout};
640 use typed_store::DBMetrics;
641
642 use super::*;
643 use crate::{
644 CommittedSubDag,
645 block::{BlockAPI as _, GENESIS_ROUND},
646 transaction::NoopTransactionVerifier,
647 };
648
649 #[rstest]
650 #[tokio::test]
651 async fn test_authority_start_and_stop(
652 #[values(NetworkType::Tonic)] network_type: NetworkType,
653 ) {
654 let (committee, keypairs) = local_committee_and_keys(0, vec![1]);
655 let registry = Registry::new();
656
657 let temp_dir = TempDir::new().unwrap();
658 let parameters = Parameters {
659 db_path: temp_dir.keep(),
660 ..Default::default()
661 };
662 let txn_verifier = NoopTransactionVerifier {};
663
664 let own_index = committee.to_authority_index(0).unwrap();
665 let protocol_keypair = keypairs[own_index].1.clone();
666 let network_keypair = keypairs[own_index].0.clone();
667
668 let (commit_consumer, _) = CommitConsumerArgs::new(0, 0);
669
670 let authority = ConsensusAuthority::start(
671 network_type,
672 0,
673 committee,
674 parameters,
675 ConsensusProtocolConfig::for_testing(),
676 Some(protocol_keypair),
677 network_keypair,
678 Arc::new(Clock::default()),
679 Arc::new(txn_verifier),
680 commit_consumer,
681 registry,
682 0,
683 None,
684 )
685 .await;
686
687 assert_eq!(authority.context().own_index, own_index);
688 assert_eq!(authority.context().committee.epoch(), 0);
689 assert_eq!(authority.context().committee.size(), 1);
690
691 authority.stop().await;
692 }
693
694 #[rstest]
695 #[tokio::test]
696 async fn test_observer_start_and_stop(#[values(NetworkType::Tonic)] network_type: NetworkType) {
697 let (committee, keypairs) = local_committee_and_keys(0, vec![1]);
698 let registry = Registry::new();
699
700 let temp_dir = TempDir::new().unwrap();
701 let parameters = Parameters {
702 db_path: temp_dir.keep(),
703 ..Default::default()
704 };
705 let txn_verifier = NoopTransactionVerifier {};
706
707 let network_keypair = keypairs[0].0.clone();
709
710 let (commit_consumer, _) = CommitConsumerArgs::new(0, 0);
711
712 let observer = ConsensusAuthority::start(
713 network_type,
714 0,
715 committee.clone(),
716 parameters,
717 ConsensusProtocolConfig::for_testing(),
718 None, network_keypair,
720 Arc::new(Clock::default()),
721 Arc::new(txn_verifier),
722 commit_consumer,
723 registry,
724 0,
725 None,
726 )
727 .await;
728
729 sleep(Duration::from_secs(2)).await;
730
731 assert_eq!(observer.context().own_index, AuthorityIndex::MAX);
733 assert_eq!(observer.context().committee.epoch(), 0);
734 assert_eq!(observer.context().committee.size(), 1);
735 assert!(!observer.context().is_validator());
736
737 observer.stop().await;
738 }
739
740 #[rstest]
745 #[tokio::test(flavor = "current_thread")]
746 async fn test_authority_committee(
747 #[values(NetworkType::Tonic)] network_type: NetworkType,
748 #[values(5, 10)] gc_depth: u32,
749 ) {
750 telemetry_subscribers::init_for_testing();
751 let db_registry = Registry::new();
752 DBMetrics::init(RegistryService::new(db_registry));
753
754 const NUM_OF_AUTHORITIES: usize = 4;
755 let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
756 let mut protocol_config = ConsensusProtocolConfig::for_testing();
757 protocol_config.set_gc_depth_for_testing(gc_depth);
758
759 let temp_dirs = (0..NUM_OF_AUTHORITIES)
760 .map(|_| TempDir::new().unwrap())
761 .collect::<Vec<_>>();
762
763 let mut commit_receivers = Vec::with_capacity(committee.size());
764 let mut authorities = Vec::with_capacity(committee.size());
765 let mut boot_counters = [0; NUM_OF_AUTHORITIES];
766
767 let observer_server_port = 8900 + gc_depth as u16;
769
770 let mut authority_0_network_key = None;
772 for (index, authority_info) in committee.authorities() {
773 let (authority, commit_receiver) = if index.value() == 0 {
774 authority_0_network_key = Some(authority_info.network_key.clone());
776 make_authority_with_observer_server(
778 index,
779 &temp_dirs[index.value()],
780 committee.clone(),
781 keypairs.clone(),
782 network_type,
783 boot_counters[index],
784 protocol_config.clone(),
785 Some(observer_server_port),
786 )
787 .await
788 } else {
789 make_authority(
790 index,
791 &temp_dirs[index.value()],
792 committee.clone(),
793 keypairs.clone(),
794 network_type,
795 boot_counters[index],
796 protocol_config.clone(),
797 )
798 .await
799 };
800 boot_counters[index] += 1;
801 commit_receivers.push(commit_receiver);
802 authorities.push(authority);
803 }
804
805 let observer_temp_dir = TempDir::new().unwrap();
807 let mut rng = StdRng::from_seed([99; 32]);
808 let observer_network_keypair = consensus_config::NetworkKeyPair::generate(&mut rng);
809
810 let observer_parameters = Parameters {
811 db_path: observer_temp_dir.path().to_path_buf(),
812 observer: ObserverParameters {
813 peers: vec![PeerRecord {
815 public_key: authority_0_network_key
816 .clone()
817 .expect("Authority 0 network key should be set"),
818 address: format!("/ip4/127.0.0.1/udp/{}", observer_server_port)
819 .parse()
820 .unwrap(),
821 }],
822 ..Default::default()
823 },
824 ..Default::default()
825 };
826
827 let (observer_commit_consumer, observer_commit_receiver) = CommitConsumerArgs::new(0, 0);
828 let observer = ConsensusAuthority::start(
829 network_type,
830 0,
831 committee.clone(),
832 observer_parameters,
833 protocol_config.clone(),
834 None, observer_network_keypair,
836 Arc::new(Clock::default()),
837 Arc::new(NoopTransactionVerifier {}),
838 observer_commit_consumer,
839 Registry::new(),
840 0,
841 None,
842 )
843 .await;
844 commit_receivers.push(observer_commit_receiver);
847
848 sleep(Duration::from_secs(5)).await;
850
851 const NUM_TRANSACTIONS: u8 = 15;
852 let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
853 for i in 0..NUM_TRANSACTIONS {
854 let txn = vec![i; 16];
855 submitted_transactions.insert(txn.clone());
856 authorities[i as usize % authorities.len()]
857 .transaction_client()
858 .submit(vec![txn])
859 .await
860 .unwrap();
861 }
862
863 for receiver in &mut commit_receivers {
864 let mut expected_transactions = submitted_transactions.clone();
865 loop {
866 let committed_subdag =
867 tokio::time::timeout(Duration::from_secs(1), receiver.recv())
868 .await
869 .unwrap()
870 .unwrap();
871 for b in committed_subdag.blocks {
872 for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
873 assert!(
874 expected_transactions.remove(&txn),
875 "Transaction not submitted or already seen: {:?}",
876 txn
877 );
878 }
879 }
880 if expected_transactions.is_empty() {
881 break;
882 }
883 }
884 }
885
886 let index = committee.to_authority_index(1).unwrap();
888 authorities.remove(index.value()).stop().await;
889 sleep(Duration::from_secs(10)).await;
890
891 let (authority, commit_receiver) = make_authority(
893 index,
894 &temp_dirs[index.value()],
895 committee.clone(),
896 keypairs.clone(),
897 network_type,
898 boot_counters[index],
899 protocol_config.clone(),
900 )
901 .await;
902 boot_counters[index] += 1;
903 commit_receivers[index] = commit_receiver;
904 authorities.insert(index.value(), authority);
905 sleep(Duration::from_secs(10)).await;
906
907 let observer_context = observer.context();
912 assert!(
913 observer_context.is_observer(),
914 "It should be an observer node"
915 );
916
917 let authority_0 = &authorities[0];
919 let authority_0_context = authority_0.context();
920 let mut authority_0_total_verified_blocks = 0;
921
922 for (_, authority_info) in committee.authorities() {
924 if let Ok(metric) = authority_0_context
925 .metrics
926 .node_metrics
927 .verified_blocks
928 .get_metric_with_label_values(&[&authority_info.hostname])
929 {
930 authority_0_total_verified_blocks += metric.get();
931 println!(
932 "authority_info.hostname: {}, metric: {:?}",
933 authority_info.hostname, authority_0_total_verified_blocks
934 );
935 }
936 }
937
938 let mut authority_0_total_proposed_blocks = 0;
939 for force in [true, false] {
940 if let Ok(metric) = authority_0_context
941 .metrics
942 .node_metrics
943 .proposed_blocks
944 .get_metric_with_label_values(&[&force.to_string()])
945 {
946 authority_0_total_proposed_blocks += metric.get();
947 }
948 }
949
950 authority_0_total_verified_blocks += authority_0_total_proposed_blocks;
951
952 let mut observer_received_blocks = 0;
954 for (_, authority_info) in committee.authorities() {
955 if let Ok(metric) = observer_context
956 .metrics
957 .node_metrics
958 .verified_blocks
959 .get_metric_with_label_values(&[&authority_info.hostname])
960 {
961 observer_received_blocks += metric.get();
962 }
963 }
964
965 assert!(
968 observer_received_blocks > 0,
969 "Observer should have received at least some blocks, got: {}",
970 observer_received_blocks
971 );
972
973 println!(
974 "authority_0_total_verified_blocks: {}, observer_received_blocks: {}",
975 authority_0_total_verified_blocks, observer_received_blocks
976 );
977
978 const TOLERANCE: u64 = 20;
979 assert!(
980 authority_0_total_verified_blocks - observer_received_blocks <= TOLERANCE,
981 "The number of blocks received by the observer ({}) should be close to the number of blocks verified by authority 0 ({})",
982 observer_received_blocks,
983 authority_0_total_verified_blocks,
984 );
985
986 observer.stop().await;
988
989 for authority in authorities {
991 authority.stop().await;
992 }
993 }
994
995 #[rstest]
996 #[tokio::test(flavor = "current_thread")]
997 async fn test_small_committee(
998 #[values(NetworkType::Tonic)] network_type: NetworkType,
999 #[values(1, 2, 3)] num_authorities: usize,
1000 ) {
1001 telemetry_subscribers::init_for_testing();
1002 let db_registry = Registry::new();
1003 DBMetrics::init(RegistryService::new(db_registry));
1004
1005 let (committee, keypairs) = local_committee_and_keys(0, vec![1; num_authorities]);
1006 let protocol_config = ConsensusProtocolConfig::for_testing();
1007
1008 let temp_dirs = (0..num_authorities)
1009 .map(|_| TempDir::new().unwrap())
1010 .collect::<Vec<_>>();
1011
1012 let mut output_receivers = Vec::with_capacity(committee.size());
1013 let mut authorities: Vec<ConsensusAuthority> = Vec::with_capacity(committee.size());
1014 let mut boot_counters = vec![0; num_authorities];
1015
1016 for (index, _authority_info) in committee.authorities() {
1017 let (authority, commit_receiver) = make_authority(
1018 index,
1019 &temp_dirs[index.value()],
1020 committee.clone(),
1021 keypairs.clone(),
1022 network_type,
1023 boot_counters[index],
1024 protocol_config.clone(),
1025 )
1026 .await;
1027 boot_counters[index] += 1;
1028 output_receivers.push(commit_receiver);
1029 authorities.push(authority);
1030 }
1031
1032 const NUM_TRANSACTIONS: u8 = 15;
1033 let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
1034 for i in 0..NUM_TRANSACTIONS {
1035 let txn = vec![i; 16];
1036 submitted_transactions.insert(txn.clone());
1037 authorities[i as usize % authorities.len()]
1038 .transaction_client()
1039 .submit(vec![txn])
1040 .await
1041 .unwrap();
1042 }
1043
1044 for receiver in &mut output_receivers {
1045 let mut expected_transactions = submitted_transactions.clone();
1046 loop {
1047 let committed_subdag =
1048 tokio::time::timeout(Duration::from_secs(1), receiver.recv())
1049 .await
1050 .unwrap()
1051 .unwrap();
1052 for b in committed_subdag.blocks {
1053 for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
1054 assert!(
1055 expected_transactions.remove(&txn),
1056 "Transaction not submitted or already seen: {:?}",
1057 txn
1058 );
1059 }
1060 }
1061 if expected_transactions.is_empty() {
1062 break;
1063 }
1064 }
1065 }
1066
1067 let index = committee.to_authority_index(0).unwrap();
1069 authorities.remove(index.value()).stop().await;
1070 sleep(Duration::from_secs(10)).await;
1071
1072 let (authority, commit_receiver) = make_authority(
1074 index,
1075 &temp_dirs[index.value()],
1076 committee.clone(),
1077 keypairs.clone(),
1078 network_type,
1079 boot_counters[index],
1080 protocol_config.clone(),
1081 )
1082 .await;
1083 boot_counters[index] += 1;
1084 output_receivers[index] = commit_receiver;
1085 authorities.insert(index.value(), authority);
1086 sleep(Duration::from_secs(10)).await;
1087
1088 for authority in authorities {
1090 authority.stop().await;
1091 }
1092 }
1093
1094 #[rstest]
1095 #[tokio::test(flavor = "current_thread")]
1096 async fn test_amnesia_recovery_success(#[values(5, 10)] gc_depth: u32) {
1097 telemetry_subscribers::init_for_testing();
1098 let db_registry = Registry::new();
1099 DBMetrics::init(RegistryService::new(db_registry));
1100
1101 const NUM_OF_AUTHORITIES: usize = 4;
1102 let (committee, keypairs) = local_committee_and_keys(0, [1; NUM_OF_AUTHORITIES].to_vec());
1103 let mut commit_receivers = vec![];
1104 let mut authorities = BTreeMap::new();
1105 let mut temp_dirs = BTreeMap::new();
1106 let mut boot_counters = [0; NUM_OF_AUTHORITIES];
1107
1108 let mut protocol_config = ConsensusProtocolConfig::for_testing();
1109 protocol_config.set_gc_depth_for_testing(gc_depth);
1110
1111 for (index, _authority_info) in committee.authorities() {
1112 let dir = TempDir::new().unwrap();
1113 let (authority, commit_receiver) = make_authority(
1114 index,
1115 &dir,
1116 committee.clone(),
1117 keypairs.clone(),
1118 NetworkType::Tonic,
1119 boot_counters[index],
1120 protocol_config.clone(),
1121 )
1122 .await;
1123 boot_counters[index] += 1;
1124 commit_receivers.push(commit_receiver);
1125 authorities.insert(index, authority);
1126 temp_dirs.insert(index, dir);
1127 }
1128
1129 let index_1 = committee.to_authority_index(1).unwrap();
1133 'outer: while let Some(result) =
1134 timeout(Duration::from_secs(10), commit_receivers[index_1].recv())
1135 .await
1136 .expect("Timed out while waiting for at least one committed block from authority 1")
1137 {
1138 for block in result.blocks {
1139 if block.round() > GENESIS_ROUND && block.author() == index_1 {
1140 break 'outer;
1141 }
1142 }
1143 }
1144
1145 authorities.remove(&index_1).unwrap().stop().await;
1151 let index_2 = committee.to_authority_index(2).unwrap();
1152 authorities.remove(&index_2).unwrap().stop().await;
1153 sleep(Duration::from_secs(5)).await;
1154
1155 let dir = TempDir::new().unwrap();
1159 boot_counters[index_1] = 0;
1161 let (authority, mut commit_receiver) = make_authority(
1162 index_1,
1163 &dir,
1164 committee.clone(),
1165 keypairs.clone(),
1166 NetworkType::Tonic,
1167 boot_counters[index_1],
1168 protocol_config.clone(),
1169 )
1170 .await;
1171 boot_counters[index_1] += 1;
1172 authorities.insert(index_1, authority);
1173 temp_dirs.insert(index_1, dir);
1174 sleep(Duration::from_secs(5)).await;
1175
1176 let (authority, _commit_receiver) = make_authority(
1179 index_2,
1180 &temp_dirs[&index_2],
1181 committee.clone(),
1182 keypairs,
1183 NetworkType::Tonic,
1184 boot_counters[index_2],
1185 protocol_config.clone(),
1186 )
1187 .await;
1188 boot_counters[index_2] += 1;
1189 authorities.insert(index_2, authority);
1190 sleep(Duration::from_secs(5)).await;
1191
1192 'outer: while let Some(result) = commit_receiver.recv().await {
1194 for block in result.blocks {
1195 if block.round() > GENESIS_ROUND && block.author() == index_1 {
1196 break 'outer;
1197 }
1198 }
1199 }
1200
1201 for (_, authority) in authorities {
1203 authority.stop().await;
1204 }
1205 }
1206
1207 async fn make_authority(
1209 index: AuthorityIndex,
1210 db_dir: &TempDir,
1211 committee: Committee,
1212 keypairs: Vec<(NetworkKeyPair, ProtocolKeyPair)>,
1213 network_type: NetworkType,
1214 boot_counter: u64,
1215 protocol_config: ConsensusProtocolConfig,
1216 ) -> (ConsensusAuthority, UnboundedReceiver<CommittedSubDag>) {
1217 make_authority_with_observer_server(
1218 index,
1219 db_dir,
1220 committee,
1221 keypairs,
1222 network_type,
1223 boot_counter,
1224 protocol_config,
1225 None, )
1227 .await
1228 }
1229
1230 async fn make_authority_with_observer_server(
1231 index: AuthorityIndex,
1232 db_dir: &TempDir,
1233 committee: Committee,
1234 keypairs: Vec<(NetworkKeyPair, ProtocolKeyPair)>,
1235 network_type: NetworkType,
1236 boot_counter: u64,
1237 protocol_config: ConsensusProtocolConfig,
1238 observer_server_port: Option<u16>,
1239 ) -> (ConsensusAuthority, UnboundedReceiver<CommittedSubDag>) {
1240 let registry = Registry::new();
1241
1242 let mut parameters = Parameters {
1244 db_path: db_dir.path().to_path_buf(),
1245 dag_state_cached_rounds: 5,
1246 commit_sync_parallel_fetches: 2,
1247 commit_sync_batch_size: 3,
1248 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
1249 ..Default::default()
1250 };
1251
1252 if let Some(port) = observer_server_port {
1254 parameters.observer.server_port = Some(port);
1255 }
1256
1257 let txn_verifier = NoopTransactionVerifier {};
1258
1259 let protocol_keypair = keypairs[index].1.clone();
1260 let network_keypair = keypairs[index].0.clone();
1261
1262 let (commit_consumer, commit_receiver) = CommitConsumerArgs::new(0, 0);
1263
1264 let authority = ConsensusAuthority::start(
1265 network_type,
1266 0,
1267 committee,
1268 parameters,
1269 protocol_config,
1270 Some(protocol_keypair),
1271 network_keypair,
1272 Arc::new(Clock::default()),
1273 Arc::new(txn_verifier),
1274 commit_consumer,
1275 registry,
1276 boot_counter,
1277 None,
1278 )
1279 .await;
1280
1281 (authority, commit_receiver)
1282 }
1283}