1use std::{
5 collections::{BTreeMap, BTreeSet},
6 sync::Arc,
7 vec,
8};
9
10use consensus_config::ProtocolKeyPair;
11#[cfg(test)]
12use consensus_config::{AuthorityIndex, Stake, local_committee_and_keys};
13use consensus_types::block::{BlockRef, Round};
14use itertools::Itertools as _;
15#[cfg(test)]
16use mysten_metrics::monitored_mpsc::UnboundedReceiver;
17use mysten_metrics::monitored_scope;
18use parking_lot::RwLock;
19use sui_macros::fail_point;
20use tokio::sync::{broadcast, watch};
21use tracing::{debug, info, trace, warn};
22
23#[cfg(test)]
24use crate::{
25 CommitConsumerArgs, TransactionClient,
26 block_verifier::NoopBlockVerifier,
27 storage::{Store, WriteBatch, mem_store::MemStore},
28};
29use crate::{
30 ancestor::AncestorStateManager,
31 block::{BlockAPI, ExtendedBlock, GENESIS_ROUND, Slot, VerifiedBlock},
32 block_manager::BlockManager,
33 commit::{
34 CertifiedCommit, CertifiedCommits, CommitAPI, CommittedSubDag, DecidedLeader, Decision,
35 },
36 commit_observer::CommitObserver,
37 context::Context,
38 dag_state::DagState,
39 error::{ConsensusError, ConsensusResult},
40 leader_schedule::LeaderSchedule,
41 proposer::{Proposer, ValidatorProposer},
42 round_tracker::RoundTracker,
43 transaction::TransactionConsumer,
44 transaction_vote_tracker::TransactionVoteTracker,
45 universal_committer::{
46 UniversalCommitter, universal_committer_builder::UniversalCommitterBuilder,
47 },
48};
49
50pub(crate) struct Core {
51 context: Arc<Context>,
52 block_manager: BlockManager,
55 committer: Arc<UniversalCommitter>,
57 last_signaled_round: Round,
59 last_decided_leader: Slot,
64 leader_schedule: Arc<LeaderSchedule>,
67 commit_observer: CommitObserver,
70 signals: CoreSignals,
72 dag_state: Arc<RwLock<DagState>>,
74 proposer: Option<Box<dyn Proposer>>,
77}
78
79impl Core {
80 pub(crate) fn new_validator(
82 context: Arc<Context>,
83 leader_schedule: Arc<LeaderSchedule>,
84 transaction_consumer: TransactionConsumer,
85 transaction_vote_tracker: TransactionVoteTracker,
86 block_manager: BlockManager,
87 commit_observer: CommitObserver,
88 signals: CoreSignals,
89 block_signer: ProtocolKeyPair,
90 dag_state: Arc<RwLock<DagState>>,
91 sync_last_known_own_block: bool,
92 round_tracker: Arc<RwLock<RoundTracker>>,
93 ) -> Self {
94 let last_decided_leader = dag_state.read().last_commit_leader();
95 let number_of_leaders = context.protocol_config.num_leaders_per_round().unwrap_or(1);
96 let committer = Arc::new(
97 UniversalCommitterBuilder::new(
98 context.clone(),
99 leader_schedule.clone(),
100 dag_state.clone(),
101 )
102 .with_number_of_leaders(number_of_leaders)
103 .with_pipeline(true)
104 .build(),
105 );
106
107 let last_proposed_block = dag_state
108 .read()
109 .get_last_proposed_block()
110 .expect("A block should have been returned");
111 let last_signaled_round = last_proposed_block.round();
112
113 let mut last_included_ancestors = vec![None; context.committee.size()];
122 for ancestor in last_proposed_block.ancestors() {
123 last_included_ancestors[ancestor.author] = Some(*ancestor);
124 }
125
126 let last_known_proposed_round = if sync_last_known_own_block {
127 None
128 } else {
129 Some(0)
131 };
132
133 let propagation_scores = leader_schedule
134 .leader_swap_table
135 .read()
136 .reputation_scores
137 .clone();
138 let mut ancestor_state_manager =
139 AncestorStateManager::new(context.clone(), dag_state.clone());
140 ancestor_state_manager.set_propagation_scores(propagation_scores);
141
142 let proposer = Some(Box::new(ValidatorProposer::new(
144 dag_state.clone(),
145 context.clone(),
146 transaction_consumer,
147 transaction_vote_tracker.clone(),
148 block_signer,
149 last_known_proposed_round,
150 ancestor_state_manager,
151 round_tracker.clone(),
152 committer.clone(),
153 )) as Box<dyn Proposer>);
154
155 Self {
156 context,
157 last_signaled_round,
158 last_decided_leader,
159 leader_schedule,
160 block_manager,
161 committer,
162 commit_observer,
163 signals,
164 dag_state,
165 proposer,
166 }
167 .recover_validator()
168 }
169
170 pub(crate) fn new_observer(
172 context: Arc<Context>,
173 leader_schedule: Arc<LeaderSchedule>,
174 block_manager: BlockManager,
175 commit_observer: CommitObserver,
176 signals: CoreSignals,
177 dag_state: Arc<RwLock<DagState>>,
178 ) -> Self {
179 let last_decided_leader = dag_state.read().last_commit_leader();
180 let number_of_leaders = context.protocol_config.num_leaders_per_round().unwrap_or(1);
181 let committer = Arc::new(
182 UniversalCommitterBuilder::new(
183 context.clone(),
184 leader_schedule.clone(),
185 dag_state.clone(),
186 )
187 .with_number_of_leaders(number_of_leaders)
188 .with_pipeline(true)
189 .build(),
190 );
191
192 let last_signaled_round = dag_state.read().threshold_clock_round();
194
195 Self {
196 context,
197 last_signaled_round,
198 last_decided_leader,
199 leader_schedule,
200 block_manager,
201 committer,
202 commit_observer,
203 signals,
204 dag_state,
205 proposer: None,
206 }
207 .recover_observer()
208 }
209
210 fn recover_observer(mut self) -> Self {
211 let _s = self
212 .context
213 .metrics
214 .node_metrics
215 .scope_processing_time
216 .with_label_values(&["Core::recover_observer"])
217 .start_timer();
218
219 self.try_commit(vec![]).unwrap();
221
222 self.try_signal_new_round();
223
224 self
225 }
226
227 fn recover_validator(mut self) -> Self {
228 let _s = self
229 .context
230 .metrics
231 .node_metrics
232 .scope_processing_time
233 .with_label_values(&["Core::recover_validator"])
234 .start_timer();
235
236 self.try_commit(vec![]).unwrap();
238
239 let last_proposed_block = if let Some(last_proposed_block) = self.try_propose(true).unwrap()
240 {
241 last_proposed_block
242 } else {
243 let proposer = self
244 .proposer
245 .as_ref()
246 .expect("Validator must have proposer");
247 let last_proposed_block = proposer.last_proposed_block();
248
249 if proposer.should_propose() {
250 assert!(
251 last_proposed_block.round() > GENESIS_ROUND,
252 "At minimum a block of round higher than genesis should have been produced during recovery"
253 );
254 }
255
256 self.signals
258 .new_block(ExtendedBlock {
259 block: last_proposed_block.clone(),
260 excluded_ancestors: vec![],
261 })
262 .unwrap();
263 last_proposed_block
264 };
265
266 self.try_signal_new_round();
270
271 info!(
272 "Core recovery for validator completed with last proposed block {:?}",
273 last_proposed_block
274 );
275
276 self
277 }
278
279 fn accept_blocks(
282 &mut self,
283 blocks: Vec<VerifiedBlock>,
284 ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
285 let (accepted_blocks, missing_block_refs) = self.block_manager.try_accept_blocks(blocks);
286 for block in &accepted_blocks {
287 tracing::trace!(
288 "{} Core accepted round {}, author {} at timestamp: {}",
289 self.context.own_index,
290 block.round(),
291 block.author(),
292 block.timestamp_ms()
293 );
294 self.signals.new_accepted_block(block.clone());
295 }
296 (accepted_blocks, missing_block_refs)
297 }
298
299 fn accept_committed_blocks(&mut self, blocks: Vec<VerifiedBlock>) -> Vec<VerifiedBlock> {
302 let accepted_blocks = self.block_manager.try_accept_committed_blocks(blocks);
303 for block in &accepted_blocks {
304 self.signals.new_accepted_block(block.clone());
305 }
306 accepted_blocks
307 }
308
309 #[tracing::instrument(skip_all)]
313 pub(crate) fn add_blocks(
314 &mut self,
315 blocks: Vec<VerifiedBlock>,
316 ) -> ConsensusResult<BTreeSet<BlockRef>> {
317 let _scope = monitored_scope("Core::add_blocks");
318 let _s = self
319 .context
320 .metrics
321 .node_metrics
322 .scope_processing_time
323 .with_label_values(&["Core::add_blocks"])
324 .start_timer();
325 self.context
326 .metrics
327 .node_metrics
328 .core_add_blocks_batch_size
329 .observe(blocks.len() as f64);
330
331 let (accepted_blocks, missing_block_refs) = self.accept_blocks(blocks);
332
333 if !accepted_blocks.is_empty() {
334 trace!(
335 "Accepted blocks: {}",
336 accepted_blocks
337 .iter()
338 .map(|b| b.reference().to_string())
339 .join(",")
340 );
341
342 self.try_commit(vec![])?;
344
345 self.try_propose(false)?;
347
348 self.try_signal_new_round();
352 };
353
354 if !missing_block_refs.is_empty() {
355 trace!(
356 "Missing block refs: {}",
357 missing_block_refs.iter().map(|b| b.to_string()).join(", ")
358 );
359 }
360
361 Ok(missing_block_refs)
362 }
363
364 #[tracing::instrument(skip_all)]
369 pub(crate) fn add_certified_commits(
370 &mut self,
371 certified_commits: CertifiedCommits,
372 ) -> ConsensusResult<BTreeSet<BlockRef>> {
373 let _scope = monitored_scope("Core::add_certified_commits");
374
375 let votes = certified_commits.votes().to_vec();
376 let commits = self
377 .filter_new_commits(certified_commits.commits().to_vec())
378 .expect("Certified commits validation failed");
379
380 let (_, missing_block_refs) = self.accept_blocks(votes);
384
385 self.try_commit(commits)?;
387
388 self.try_propose(false)?;
390
391 self.try_signal_new_round();
395
396 Ok(missing_block_refs)
397 }
398
399 pub(crate) fn check_block_refs(
402 &mut self,
403 block_refs: Vec<BlockRef>,
404 ) -> ConsensusResult<BTreeSet<BlockRef>> {
405 let _scope = monitored_scope("Core::check_block_refs");
406 let _s = self
407 .context
408 .metrics
409 .node_metrics
410 .scope_processing_time
411 .with_label_values(&["Core::check_block_refs"])
412 .start_timer();
413 self.context
414 .metrics
415 .node_metrics
416 .core_check_block_refs_batch_size
417 .observe(block_refs.len() as f64);
418
419 let missing_block_refs = self.block_manager.try_find_blocks(block_refs);
421
422 if !missing_block_refs.is_empty() {
423 trace!(
424 "Missing block refs: {}",
425 missing_block_refs.iter().map(|b| b.to_string()).join(", ")
426 );
427 }
428 Ok(missing_block_refs)
429 }
430
431 fn try_signal_new_round(&mut self) {
433 let new_clock_round = self.dag_state.read().threshold_clock_round();
438 if new_clock_round <= self.last_signaled_round {
439 return;
440 }
441 self.signals.new_round(new_clock_round);
443 self.last_signaled_round = new_clock_round;
444
445 self.context
447 .metrics
448 .node_metrics
449 .threshold_clock_round
450 .set(new_clock_round as i64);
451 }
452
453 pub(crate) fn new_block(
457 &mut self,
458 round: Round,
459 force: bool,
460 ) -> ConsensusResult<Option<VerifiedBlock>> {
461 let _scope = monitored_scope("Core::new_block");
462 if let Some(last_round) = self.last_proposed_round()
463 && last_round < round
464 {
465 self.context
466 .metrics
467 .node_metrics
468 .leader_timeout_total
469 .with_label_values(&[&format!("{force}")])
470 .inc();
471 let result = self.try_propose(force);
472 self.try_signal_new_round();
474 return result;
475 }
476 Ok(None)
477 }
478
479 fn filter_new_commits(
482 &mut self,
483 commits: Vec<CertifiedCommit>,
484 ) -> ConsensusResult<Vec<CertifiedCommit>> {
485 let last_commit_index = self.dag_state.read().last_commit_index();
487 let commits = commits
488 .iter()
489 .filter(|commit| {
490 if commit.index() > last_commit_index {
491 true
492 } else {
493 tracing::debug!(
494 "Skip commit for index {} as it is already committed with last commit index {}",
495 commit.index(),
496 last_commit_index
497 );
498 false
499 }
500 })
501 .cloned()
502 .collect::<Vec<_>>();
503
504 if let Some(commit) = commits.first()
506 && commit.index() != last_commit_index + 1
507 {
508 return Err(ConsensusError::UnexpectedCertifiedCommitIndex {
509 expected_commit_index: last_commit_index + 1,
510 commit_index: commit.index(),
511 });
512 }
513
514 Ok(commits)
515 }
516
517 fn try_propose(&mut self, force: bool) -> ConsensusResult<Option<VerifiedBlock>> {
521 if let Some(proposer) = &mut self.proposer
522 && let Some(extended_block) = proposer.try_new_block(force)
523 {
524 self.signals.new_block(extended_block.clone())?;
525 self.signals
526 .new_accepted_block(extended_block.block.clone());
527
528 fail_point!("consensus-after-propose");
529
530 self.try_commit(vec![])?;
532 return Ok(Some(extended_block.block));
533 }
534 Ok(None)
535 }
536
537 fn try_commit(
540 &mut self,
541 mut certified_commits: Vec<CertifiedCommit>,
542 ) -> ConsensusResult<Vec<CommittedSubDag>> {
543 let _s = self
544 .context
545 .metrics
546 .node_metrics
547 .scope_processing_time
548 .with_label_values(&["Core::try_commit"])
549 .start_timer();
550
551 let mut certified_commits_map = BTreeMap::new();
552 for c in &certified_commits {
553 certified_commits_map.insert(c.index(), c.reference());
554 }
555
556 if !certified_commits.is_empty() {
557 info!(
558 "Processing synced commits: {:?}",
559 certified_commits
560 .iter()
561 .map(|c| (c.index(), c.leader()))
562 .collect::<Vec<_>>()
563 );
564 }
565
566 let mut committed_sub_dags = Vec::new();
567 loop {
569 let mut commits_until_update = self
574 .leader_schedule
575 .commits_until_leader_schedule_update(self.dag_state.clone());
576
577 if commits_until_update == 0 {
578 let last_commit_index = self.dag_state.read().last_commit_index();
579
580 tracing::info!(
581 "Leader schedule change triggered at commit index {last_commit_index}"
582 );
583
584 self.leader_schedule
585 .update_leader_schedule_v2(&self.dag_state);
586
587 let propagation_scores = self
588 .leader_schedule
589 .leader_swap_table
590 .read()
591 .reputation_scores
592 .clone();
593 if let Some(proposer) = &mut self.proposer {
594 proposer.set_propagation_scores(propagation_scores);
595 }
596
597 commits_until_update = self
598 .leader_schedule
599 .commits_until_leader_schedule_update(self.dag_state.clone());
600
601 fail_point!("consensus-after-leader-schedule-change");
602 }
603 assert!(commits_until_update > 0);
604
605 let (certified_leaders, decided_certified_commits): (
608 Vec<DecidedLeader>,
609 Vec<CertifiedCommit>,
610 ) = self
611 .try_select_certified_leaders(&mut certified_commits, commits_until_update)
612 .into_iter()
613 .unzip();
614
615 let blocks = decided_certified_commits
622 .iter()
623 .flat_map(|c| c.blocks())
624 .cloned()
625 .collect::<Vec<_>>();
626 self.accept_committed_blocks(blocks);
627
628 let (decided_leaders, local) = if certified_leaders.is_empty() {
630 let mut decided_leaders = self.committer.try_decide(self.last_decided_leader);
632 if decided_leaders.len() >= commits_until_update {
634 let _ = decided_leaders.split_off(commits_until_update);
635 }
636 (decided_leaders, true)
637 } else {
638 (certified_leaders, false)
639 };
640
641 let Some(last_decided) = decided_leaders.last().cloned() else {
643 break;
644 };
645
646 self.last_decided_leader = last_decided.slot();
647 self.context
648 .metrics
649 .node_metrics
650 .last_decided_leader_round
651 .set(self.last_decided_leader.round as i64);
652
653 let sequenced_leaders = decided_leaders
654 .into_iter()
655 .filter_map(|leader| leader.into_committed_block())
656 .collect::<Vec<_>>();
657 if sequenced_leaders.is_empty() {
660 break;
661 }
662 tracing::info!(
663 "Committing {} leaders: {}; {} commits before next leader schedule change",
664 sequenced_leaders.len(),
665 sequenced_leaders
666 .iter()
667 .map(|b| b.reference().to_string())
668 .join(","),
669 commits_until_update,
670 );
671
672 let subdags = self
674 .commit_observer
675 .handle_commit(sequenced_leaders, local)?;
676
677 self.block_manager
679 .try_unsuspend_blocks_for_latest_gc_round();
680
681 committed_sub_dags.extend(subdags);
682
683 fail_point!("consensus-after-handle-commit");
684 }
685
686 for sub_dag in &committed_sub_dags {
688 if let Some(commit_ref) = certified_commits_map.remove(&sub_dag.commit_ref.index) {
689 assert_eq!(
690 commit_ref, sub_dag.commit_ref,
691 "Certified commit has different reference than the committed sub dag"
692 );
693 }
694 }
695
696 let committed_block_refs = committed_sub_dags
698 .iter()
699 .flat_map(|sub_dag| sub_dag.blocks.iter())
700 .filter_map(|block| {
701 (block.author() == self.context.own_index).then_some(block.reference())
702 })
703 .collect::<Vec<_>>();
704 if let Some(proposer) = &self.proposer {
705 proposer.notify_own_blocks_committed(
706 committed_block_refs,
707 self.dag_state.read().gc_round(),
708 );
709 }
710
711 Ok(committed_sub_dags)
712 }
713
714 pub(crate) fn get_missing_blocks(&self) -> BTreeSet<BlockRef> {
715 let _scope = monitored_scope("Core::get_missing_blocks");
716 self.block_manager.missing_blocks()
717 }
718
719 pub(crate) fn set_propagation_delay(&mut self, delay: Round) {
721 info!("Propagation round delay set to: {delay}");
722 if let Some(proposer) = &mut self.proposer {
723 proposer.set_propagation_delay(delay);
724 }
725 }
726
727 pub(crate) fn set_last_known_proposed_round(&mut self, round: Round) {
731 if let Some(proposer) = &mut self.proposer {
732 if proposer.get_last_known_proposed_round().is_some() {
733 panic!(
734 "Should not attempt to set the last known proposed round if that has been already set"
735 );
736 }
737 proposer.set_last_known_proposed_round(round);
738 info!("Last known proposed round set to {round}");
739 }
740 }
741
742 pub(crate) fn should_propose(&self) -> bool {
745 self.proposer
746 .as_ref()
747 .map(|p| p.should_propose())
748 .unwrap_or(false)
749 }
750
751 pub(crate) fn last_proposed_round(&self) -> Option<Round> {
753 self.proposer.as_ref().map(|p| p.last_proposed_round())
754 }
755
756 #[tracing::instrument(skip_all)]
762 fn try_select_certified_leaders(
763 &mut self,
764 certified_commits: &mut Vec<CertifiedCommit>,
765 limit: usize,
766 ) -> Vec<(DecidedLeader, CertifiedCommit)> {
767 assert!(limit > 0, "limit should be greater than 0");
768 if certified_commits.is_empty() {
769 return vec![];
770 }
771
772 let to_commit = if certified_commits.len() >= limit {
773 certified_commits.drain(..limit).collect::<Vec<_>>()
775 } else {
776 std::mem::take(certified_commits)
778 };
779
780 tracing::debug!(
781 "Selected {} certified leaders: {}",
782 to_commit.len(),
783 to_commit.iter().map(|c| c.leader().to_string()).join(",")
784 );
785
786 to_commit
787 .into_iter()
788 .map(|commit| {
789 let leader = commit.blocks().last().expect("Certified commit should have at least one block");
790 assert_eq!(leader.reference(), commit.leader(), "Last block of the committed sub dag should have the same digest as the leader of the commit");
791 let leader = DecidedLeader::Commit(leader.clone(), false);
793 UniversalCommitter::update_metrics(&self.context, &leader, Decision::Certified);
794 (leader, commit)
795 })
796 .collect::<Vec<_>>()
797 }
798
799 #[cfg(test)]
801 pub(crate) fn last_proposed_block(&self) -> VerifiedBlock {
802 self.proposer
803 .as_ref()
804 .expect("Proposer should be present")
805 .last_proposed_block()
806 }
807
808 #[cfg(test)]
811 pub(crate) fn round_tracker_for_tests(&self) -> Arc<RwLock<RoundTracker>> {
812 self.proposer
813 .as_ref()
814 .expect("Proposer should be present")
815 .round_tracker_for_tests()
816 }
817}
818
819pub(crate) struct CoreSignals {
821 tx_block_broadcast: broadcast::Sender<ExtendedBlock>,
822 tx_accepted_block_broadcast: broadcast::Sender<VerifiedBlock>,
823 new_round_sender: watch::Sender<Round>,
824 context: Arc<Context>,
825}
826
827impl CoreSignals {
828 pub fn new(context: Arc<Context>) -> (Self, CoreSignalsReceivers) {
829 let (tx_block_broadcast, rx_block_broadcast) = broadcast::channel::<ExtendedBlock>(
833 context.parameters.dag_state_cached_rounds as usize,
834 );
835 let (tx_accepted_block_broadcast, rx_accepted_block_broadcast) =
836 broadcast::channel::<VerifiedBlock>(
837 2 * context.parameters.dag_state_cached_rounds as usize * context.committee.size(),
838 );
839 let (new_round_sender, new_round_receiver) = watch::channel(0);
840
841 let me = Self {
842 tx_block_broadcast,
843 tx_accepted_block_broadcast,
844 new_round_sender,
845 context,
846 };
847
848 let receivers = CoreSignalsReceivers {
849 rx_block_broadcast,
850 rx_accepted_block_broadcast,
851 new_round_receiver,
852 };
853
854 (me, receivers)
855 }
856
857 pub(crate) fn new_block(&self, extended_block: ExtendedBlock) -> ConsensusResult<()> {
860 if self.context.committee.size() > 1 {
863 if extended_block.block.round() == GENESIS_ROUND {
864 debug!("Ignoring broadcasting genesis block to peers");
865 return Ok(());
866 }
867
868 if let Err(err) = self.tx_block_broadcast.send(extended_block) {
869 warn!("Couldn't broadcast the block to any receiver: {err}");
870 return Err(ConsensusError::Shutdown);
871 }
872 } else {
873 debug!(
874 "Did not broadcast block {extended_block:?} to receivers as committee size is <= 1"
875 );
876 }
877 Ok(())
878 }
879
880 pub(crate) fn new_accepted_block(&self, block: VerifiedBlock) {
885 let _ = self.tx_accepted_block_broadcast.send(block);
887 }
888
889 pub(crate) fn new_round(&mut self, round_number: Round) {
892 let _ = self.new_round_sender.send_replace(round_number);
893 }
894}
895
896pub(crate) struct CoreSignalsReceivers {
899 rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
900 rx_accepted_block_broadcast: broadcast::Receiver<VerifiedBlock>,
901 new_round_receiver: watch::Receiver<Round>,
902}
903
904impl CoreSignalsReceivers {
905 pub(crate) fn block_broadcast_receiver(&self) -> broadcast::Receiver<ExtendedBlock> {
906 self.rx_block_broadcast.resubscribe()
907 }
908
909 pub(crate) fn accepted_block_broadcast_receiver(&self) -> broadcast::Receiver<VerifiedBlock> {
910 self.rx_accepted_block_broadcast.resubscribe()
911 }
912
913 pub(crate) fn new_round_receiver(&self) -> watch::Receiver<Round> {
914 self.new_round_receiver.clone()
915 }
916}
917
918#[cfg(test)]
921pub(crate) async fn create_cores(
922 context: Context,
923 authorities: Vec<Stake>,
924) -> Vec<CoreTestFixture> {
925 let mut cores = Vec::new();
926
927 for index in 0..authorities.len() {
928 let own_index = AuthorityIndex::new_for_test(index as u32);
929 let core =
930 CoreTestFixture::new(context.clone(), authorities.clone(), own_index, false).await;
931 cores.push(core);
932 }
933 cores
934}
935
936#[cfg(test)]
937pub(crate) struct CoreTestFixture {
938 pub(crate) core: Core,
939 pub(crate) transaction_vote_tracker: TransactionVoteTracker,
940 pub(crate) signal_receivers: CoreSignalsReceivers,
941 pub(crate) block_receiver: broadcast::Receiver<ExtendedBlock>,
942 pub(crate) _commit_output_receiver: UnboundedReceiver<CommittedSubDag>,
943 pub(crate) dag_state: Arc<RwLock<DagState>>,
944 pub(crate) store: Arc<MemStore>,
945 pub(crate) transaction_client: TransactionClient,
946}
947
948#[cfg(test)]
949impl CoreTestFixture {
950 async fn new(
951 context: Context,
952 authorities: Vec<Stake>,
953 own_index: AuthorityIndex,
954 sync_last_known_own_block: bool,
955 ) -> Self {
956 Self::new_with_prepopulated_blocks(
957 context,
958 authorities,
959 own_index,
960 sync_last_known_own_block,
961 vec![],
962 )
963 .await
964 }
965
966 async fn new_with_prepopulated_blocks(
970 context: Context,
971 authorities: Vec<Stake>,
972 own_index: AuthorityIndex,
973 sync_last_known_own_block: bool,
974 prepopulated_blocks: Vec<VerifiedBlock>,
975 ) -> Self {
976 let (committee, mut signers) = local_committee_and_keys(0, authorities.clone());
977 let mut context = context.clone();
978 context = context
979 .with_committee(committee)
980 .with_authority_index(own_index);
981 context
982 .protocol_config
983 .set_bad_nodes_stake_threshold_for_testing(33);
984
985 let context = Arc::new(context);
986 let store = Arc::new(MemStore::new());
987 if !prepopulated_blocks.is_empty() {
988 store
989 .write(WriteBatch::default().blocks(prepopulated_blocks))
990 .expect("Storage error");
991 }
992 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
993
994 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
995 let leader_schedule = Arc::new(
996 LeaderSchedule::from_store(context.clone(), dag_state.clone())
997 .with_num_commits_per_schedule(10),
998 );
999 let (transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1000 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1001 let transaction_vote_tracker = TransactionVoteTracker::new(
1002 context.clone(),
1003 Arc::new(NoopBlockVerifier {}),
1004 dag_state.clone(),
1005 );
1006 let (signals, signal_receivers) = CoreSignals::new(context.clone());
1007 let block_receiver = signal_receivers.block_broadcast_receiver();
1009
1010 let (commit_consumer, commit_output_receiver) = CommitConsumerArgs::new(0, 0);
1011 let commit_observer = CommitObserver::new(
1012 context.clone(),
1013 commit_consumer,
1014 dag_state.clone(),
1015 transaction_vote_tracker.clone(),
1016 )
1017 .await;
1018
1019 let block_signer = signers.remove(own_index.value()).1;
1020
1021 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1022 let core = Core::new_validator(
1023 context.clone(),
1024 leader_schedule,
1025 transaction_consumer,
1026 transaction_vote_tracker.clone(),
1027 block_manager,
1028 commit_observer,
1029 signals,
1030 block_signer,
1031 dag_state.clone(),
1032 sync_last_known_own_block,
1033 round_tracker.clone(),
1034 );
1035
1036 Self {
1037 core,
1038 transaction_vote_tracker,
1039 signal_receivers,
1040 block_receiver,
1041 _commit_output_receiver: commit_output_receiver,
1042 dag_state,
1043 store,
1044 transaction_client,
1045 }
1046 }
1047
1048 pub(crate) fn add_blocks(
1049 &mut self,
1050 blocks: Vec<VerifiedBlock>,
1051 ) -> ConsensusResult<BTreeSet<BlockRef>> {
1052 self.transaction_vote_tracker
1053 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
1054 self.core.add_blocks(blocks)
1055 }
1056}
1057
1058#[cfg(test)]
1059mod test {
1060 use std::{collections::BTreeSet, iter, time::Duration};
1061
1062 use consensus_config::{AuthorityIndex, Parameters};
1063 use consensus_types::block::BlockTimestampMs;
1064 use futures::{StreamExt, stream::FuturesUnordered};
1065 use tokio::time::sleep;
1066
1067 use super::*;
1068 use crate::{
1069 CommitConsumerArgs, CommitIndex,
1070 block::{TestBlock, genesis_blocks},
1071 block_verifier::NoopBlockVerifier,
1072 commit::CommitAPI,
1073 leader_scoring::ReputationScores,
1074 storage::{Store, WriteBatch, mem_store::MemStore},
1075 test_dag_builder::DagBuilder,
1076 test_dag_parser::parse_dag,
1077 transaction::{BlockStatus, TransactionClient},
1078 };
1079
1080 #[tokio::test]
1082 async fn test_core_recover_from_store_for_full_round() {
1083 telemetry_subscribers::init_for_testing();
1084 let (context, mut key_pairs) = Context::new_for_test(4);
1085 let context = Arc::new(context);
1086 let store = Arc::new(MemStore::new());
1087 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1088 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1089 let mut block_status_subscriptions = FuturesUnordered::new();
1090
1091 let mut last_round_blocks = genesis_blocks(&context);
1093 let mut all_blocks: Vec<VerifiedBlock> = last_round_blocks.clone();
1094 for round in 1..=4 {
1095 let mut this_round_blocks = Vec::new();
1096 for (index, _authority) in context.committee.authorities() {
1097 let block = VerifiedBlock::new_for_test(
1098 TestBlock::new(round, index.value() as u32)
1099 .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
1100 .build(),
1101 );
1102
1103 if round == 1 && index == context.own_index {
1105 let subscription =
1106 transaction_consumer.subscribe_for_block_status_testing(block.reference());
1107 block_status_subscriptions.push(subscription);
1108 }
1109
1110 this_round_blocks.push(block);
1111 }
1112 all_blocks.extend(this_round_blocks.clone());
1113 last_round_blocks = this_round_blocks;
1114 }
1115 store
1117 .write(WriteBatch::default().blocks(all_blocks))
1118 .expect("Storage error");
1119
1120 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1122 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
1123 let leader_schedule = Arc::new(LeaderSchedule::from_store(
1124 context.clone(),
1125 dag_state.clone(),
1126 ));
1127 let transaction_vote_tracker = TransactionVoteTracker::new(
1128 context.clone(),
1129 Arc::new(NoopBlockVerifier {}),
1130 dag_state.clone(),
1131 );
1132
1133 let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
1134 let commit_observer = CommitObserver::new(
1135 context.clone(),
1136 commit_consumer,
1137 dag_state.clone(),
1138 transaction_vote_tracker.clone(),
1139 )
1140 .await;
1141
1142 let last_commit = store.read_last_commit().unwrap();
1144 assert!(last_commit.is_none());
1145 assert_eq!(dag_state.read().last_commit_index(), 0);
1146
1147 let (signals, signal_receivers) = CoreSignals::new(context.clone());
1149 let transaction_vote_tracker = TransactionVoteTracker::new(
1150 context.clone(),
1151 Arc::new(NoopBlockVerifier {}),
1152 dag_state.clone(),
1153 );
1154 transaction_vote_tracker.recover_blocks_after_round(dag_state.read().gc_round());
1155 let mut block_receiver = signal_receivers.block_broadcast_receiver();
1157 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1158 let _core = Core::new_validator(
1159 context.clone(),
1160 leader_schedule,
1161 transaction_consumer,
1162 transaction_vote_tracker.clone(),
1163 block_manager,
1164 commit_observer,
1165 signals,
1166 key_pairs.remove(context.own_index.value()).1,
1167 dag_state.clone(),
1168 false,
1169 round_tracker,
1170 );
1171
1172 let mut new_round = signal_receivers.new_round_receiver();
1174 assert_eq!(*new_round.borrow_and_update(), 5);
1175
1176 let proposed_block = block_receiver
1178 .recv()
1179 .await
1180 .expect("A block should have been created");
1181 assert_eq!(proposed_block.block.round(), 5);
1182 let ancestors = proposed_block.block.ancestors();
1183
1184 assert_eq!(ancestors.len(), 4);
1186 for ancestor in ancestors {
1187 assert_eq!(ancestor.round, 4);
1188 }
1189
1190 dag_state.write().flush();
1192
1193 let last_commit = store
1197 .read_last_commit()
1198 .unwrap()
1199 .expect("last commit should be set");
1200 assert_eq!(last_commit.index(), 2);
1201 assert_eq!(dag_state.read().last_commit_index(), 2);
1202 let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1203 assert_eq!(all_stored_commits.len(), 2);
1204
1205 while let Some(result) = block_status_subscriptions.next().await {
1207 let status = result.unwrap();
1208 assert!(matches!(status, BlockStatus::Sequenced(_)));
1209 }
1210 }
1211
1212 #[tokio::test]
1215 async fn test_core_recover_from_store_for_partial_round() {
1216 telemetry_subscribers::init_for_testing();
1217
1218 let (context, _) = Context::new_for_test(4);
1219
1220 let mut last_round_blocks = genesis_blocks(&context);
1222 let mut all_blocks = last_round_blocks.clone();
1223 for round in 1..=4 {
1224 let mut this_round_blocks = Vec::new();
1225
1226 let authorities_to_skip = if round == 4 {
1228 context.committee.validity_threshold() as usize
1229 } else {
1230 1
1232 };
1233
1234 for (index, _authority) in context.committee.authorities().skip(authorities_to_skip) {
1235 let block = TestBlock::new(round, index.value() as u32)
1236 .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
1237 .build();
1238 this_round_blocks.push(VerifiedBlock::new_for_test(block));
1239 }
1240 all_blocks.extend(this_round_blocks.clone());
1241 last_round_blocks = this_round_blocks;
1242 }
1243
1244 let mut fixture = CoreTestFixture::new_with_prepopulated_blocks(
1245 context,
1246 vec![1, 1, 1, 1],
1247 AuthorityIndex::new_for_test(0),
1248 false,
1249 all_blocks,
1250 )
1251 .await;
1252
1253 let mut new_round = fixture.signal_receivers.new_round_receiver();
1256 assert_eq!(*new_round.borrow_and_update(), 5);
1257
1258 let proposed_block = fixture
1260 .block_receiver
1261 .recv()
1262 .await
1263 .expect("A block should have been created");
1264 assert_eq!(proposed_block.block.round(), 4);
1265 let ancestors = proposed_block.block.ancestors();
1266
1267 assert_eq!(ancestors.len(), 4);
1268 let own_index = fixture.core.context.own_index;
1269 for ancestor in ancestors {
1270 if ancestor.author == own_index {
1271 assert_eq!(ancestor.round, 0);
1272 } else {
1273 assert_eq!(ancestor.round, 3);
1274 }
1275 }
1276
1277 fixture.core.try_commit(vec![]).ok();
1279
1280 fixture.core.dag_state.write().flush();
1282
1283 let last_commit = fixture
1287 .store
1288 .read_last_commit()
1289 .unwrap()
1290 .expect("last commit should be set");
1291 assert_eq!(last_commit.index(), 2);
1292 assert_eq!(fixture.dag_state.read().last_commit_index(), 2);
1293 let all_stored_commits = fixture
1294 .store
1295 .scan_commits((0..=CommitIndex::MAX).into())
1296 .unwrap();
1297 assert_eq!(all_stored_commits.len(), 2);
1298 }
1299
1300 #[tokio::test]
1301 async fn test_core_propose_after_genesis() {
1302 telemetry_subscribers::init_for_testing();
1303 let (mut context, _) = Context::new_for_test(4);
1304 context
1305 .protocol_config
1306 .set_max_transaction_size_bytes_for_testing(2_000);
1307 context
1308 .protocol_config
1309 .set_max_transactions_in_block_bytes_for_testing(2_000);
1310
1311 let mut fixture = CoreTestFixture::new(
1312 context,
1313 vec![1, 1, 1, 1],
1314 AuthorityIndex::new_for_test(0),
1315 false,
1316 )
1317 .await;
1318
1319 let mut total = 0;
1321 let mut index = 0;
1322 loop {
1323 let transaction =
1324 bcs::to_bytes(&format!("Transaction {index}")).expect("Shouldn't fail");
1325 total += transaction.len();
1326 index += 1;
1327 let _w = fixture
1328 .transaction_client
1329 .submit_no_wait(vec![transaction])
1330 .await
1331 .unwrap();
1332
1333 if total >= 1_000 {
1335 break;
1336 }
1337 }
1338
1339 let extended_block = fixture
1341 .block_receiver
1342 .recv()
1343 .await
1344 .expect("A new block should have been created");
1345
1346 assert_eq!(extended_block.block.round(), 1);
1348 assert_eq!(extended_block.block.author().value(), 0);
1349 assert_eq!(extended_block.block.ancestors().len(), 4);
1350
1351 let mut total = 0;
1352 for (i, transaction) in extended_block.block.transactions().iter().enumerate() {
1353 total += transaction.data().len() as u64;
1354 let transaction: String = bcs::from_bytes(transaction.data()).unwrap();
1355 assert_eq!(format!("Transaction {i}"), transaction);
1356 }
1357 assert!(
1358 total
1359 <= fixture
1360 .core
1361 .context
1362 .protocol_config
1363 .max_transactions_in_block_bytes()
1364 );
1365
1366 let all_genesis = genesis_blocks(&fixture.core.context);
1368
1369 for ancestor in extended_block.block.ancestors() {
1370 all_genesis
1371 .iter()
1372 .find(|block| block.reference() == *ancestor)
1373 .expect("Block should be found amongst genesis blocks");
1374 }
1375
1376 assert!(fixture.core.try_propose(false).unwrap().is_none());
1378 assert!(fixture.core.try_propose(true).unwrap().is_none());
1379
1380 fixture.dag_state.write().flush();
1382
1383 let last_commit = fixture.store.read_last_commit().unwrap();
1385 assert!(last_commit.is_none());
1386 assert_eq!(fixture.dag_state.read().last_commit_index(), 0);
1387 }
1388
1389 #[tokio::test]
1390 async fn test_core_propose_once_receiving_a_quorum() {
1391 telemetry_subscribers::init_for_testing();
1392 let (context, _key_pairs) = Context::new_for_test(4);
1393 let mut core_fixture = CoreTestFixture::new(
1394 context.clone(),
1395 vec![1, 1, 1, 1],
1396 AuthorityIndex::new_for_test(0),
1397 false,
1398 )
1399 .await;
1400 let transaction_vote_tracker = &core_fixture.transaction_vote_tracker;
1401 let store = &core_fixture.store;
1402 let dag_state = &core_fixture.dag_state;
1403 let core = &mut core_fixture.core;
1404
1405 let mut expected_ancestors = BTreeSet::new();
1406
1407 let block_1 = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
1409 expected_ancestors.insert(block_1.reference());
1410 sleep(context.parameters.min_round_delay).await;
1412 transaction_vote_tracker.add_voted_blocks(vec![(block_1.clone(), vec![])]);
1414 _ = core.add_blocks(vec![block_1]);
1415
1416 assert_eq!(core.last_proposed_round(), Some(1));
1417 expected_ancestors.insert(core.last_proposed_block().reference());
1418 assert!(core.try_propose(false).unwrap().is_none());
1420
1421 let block_2 = VerifiedBlock::new_for_test(TestBlock::new(1, 2).build());
1423 expected_ancestors.insert(block_2.reference());
1424 sleep(context.parameters.min_round_delay).await;
1426 transaction_vote_tracker.add_voted_blocks(vec![(block_2.clone(), vec![1, 4])]);
1428 _ = core.add_blocks(vec![block_2.clone()]);
1429
1430 assert_eq!(core.last_proposed_round(), Some(2));
1431
1432 let proposed_block = core.last_proposed_block();
1433 assert_eq!(proposed_block.round(), 2);
1434 assert_eq!(proposed_block.author(), context.own_index);
1435 assert_eq!(proposed_block.ancestors().len(), 3);
1436 let ancestors = proposed_block.ancestors();
1437 let ancestors = ancestors.iter().cloned().collect::<BTreeSet<_>>();
1438 assert_eq!(ancestors, expected_ancestors);
1439
1440 let transaction_votes = proposed_block.transaction_votes();
1441 assert_eq!(transaction_votes.len(), 1);
1442 let transaction_vote = transaction_votes.first().unwrap();
1443 assert_eq!(transaction_vote.block_ref, block_2.reference());
1444 assert_eq!(transaction_vote.rejects, vec![1, 4]);
1445
1446 dag_state.write().flush();
1448
1449 let last_commit = store.read_last_commit().unwrap();
1451 assert!(last_commit.is_none());
1452 assert_eq!(dag_state.read().last_commit_index(), 0);
1453 }
1454
1455 #[tokio::test]
1456 async fn test_commit_and_notify_for_block_status() {
1457 telemetry_subscribers::init_for_testing();
1458 let (mut context, mut key_pairs) = Context::new_for_test(4);
1459 const GC_DEPTH: u32 = 2;
1460
1461 context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
1462
1463 let context = Arc::new(context);
1464
1465 let store = Arc::new(MemStore::new());
1466 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1467 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1468 let mut block_status_subscriptions = FuturesUnordered::new();
1469
1470 let dag_str = "DAG {
1471 Round 0 : { 4 },
1472 Round 1 : { * },
1473 Round 2 : { * },
1474 Round 3 : {
1475 A -> [*],
1476 B -> [-A2],
1477 C -> [-A2],
1478 D -> [-A2],
1479 },
1480 Round 4 : {
1481 B -> [-A3],
1482 C -> [-A3],
1483 D -> [-A3],
1484 },
1485 Round 5 : {
1486 A -> [A3, B4, C4, D4]
1487 B -> [*],
1488 C -> [*],
1489 D -> [*],
1490 },
1491 Round 6 : { * },
1492 Round 7 : { * },
1493 Round 8 : { * },
1494 }";
1495
1496 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
1497 dag_builder.print();
1498
1499 for block in dag_builder.blocks(1..=5) {
1501 if block.author() == context.own_index {
1502 let subscription =
1503 transaction_consumer.subscribe_for_block_status_testing(block.reference());
1504 block_status_subscriptions.push(subscription);
1505 }
1506 }
1507
1508 store
1510 .write(WriteBatch::default().blocks(dag_builder.blocks(1..=8)))
1511 .expect("Storage error");
1512
1513 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1515 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
1516 let leader_schedule = Arc::new(LeaderSchedule::from_store(
1517 context.clone(),
1518 dag_state.clone(),
1519 ));
1520 let transaction_vote_tracker = TransactionVoteTracker::new(
1521 context.clone(),
1522 Arc::new(NoopBlockVerifier {}),
1523 dag_state.clone(),
1524 );
1525
1526 let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
1527 let commit_observer = CommitObserver::new(
1528 context.clone(),
1529 commit_consumer,
1530 dag_state.clone(),
1531 transaction_vote_tracker.clone(),
1532 )
1533 .await;
1534
1535 dag_state.write().flush();
1537
1538 let last_commit = store.read_last_commit().unwrap();
1540 assert!(last_commit.is_none());
1541 assert_eq!(dag_state.read().last_commit_index(), 0);
1542
1543 let (signals, signal_receivers) = CoreSignals::new(context.clone());
1545 let transaction_vote_tracker = TransactionVoteTracker::new(
1546 context.clone(),
1547 Arc::new(NoopBlockVerifier {}),
1548 dag_state.clone(),
1549 );
1550 transaction_vote_tracker.recover_blocks_after_round(dag_state.read().gc_round());
1551 let _block_receiver = signal_receivers.block_broadcast_receiver();
1553 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1554 let _core = Core::new_validator(
1555 context.clone(),
1556 leader_schedule,
1557 transaction_consumer,
1558 transaction_vote_tracker,
1559 block_manager,
1560 commit_observer,
1561 signals,
1562 key_pairs.remove(context.own_index.value()).1,
1563 dag_state.clone(),
1564 false,
1565 round_tracker,
1566 );
1567
1568 dag_state.write().flush();
1570
1571 let last_commit = store
1572 .read_last_commit()
1573 .unwrap()
1574 .expect("last commit should be set");
1575
1576 assert_eq!(last_commit.index(), 5);
1577
1578 while let Some(result) = block_status_subscriptions.next().await {
1579 let status = result.unwrap();
1580
1581 match status {
1582 BlockStatus::Sequenced(block_ref) => {
1583 assert!(block_ref.round == 1 || block_ref.round == 5);
1584 }
1585 BlockStatus::GarbageCollected(block_ref) => {
1586 assert!(block_ref.round == 2 || block_ref.round == 3);
1587 }
1588 }
1589 }
1590 }
1591
1592 #[tokio::test]
1595 async fn test_multiple_commits_advance_threshold_clock() {
1596 telemetry_subscribers::init_for_testing();
1597 let (mut context, _) = Context::new_for_test(4);
1598 const GC_DEPTH: u32 = 2;
1599
1600 context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
1601
1602 let dag_str = "DAG {
1606 Round 0 : { 4 },
1607 Round 1 : { * },
1608 Round 2 : {
1609 B -> [-D1],
1610 C -> [-D1],
1611 D -> [-D1],
1612 },
1613 Round 3 : {
1614 B -> [*],
1615 C -> [*]
1616 D -> [*],
1617 },
1618 Round 4 : {
1619 A -> [*],
1620 B -> [*],
1621 C -> [*]
1622 D -> [*],
1623 },
1624 Round 5 : {
1625 A -> [*],
1626 B -> [*],
1627 C -> [*],
1628 D -> [*],
1629 },
1630 Round 6 : {
1631 B -> [A5, B5, C5, D1],
1632 C -> [A5, B5, C5, D1],
1633 D -> [A5, B5, C5, D1],
1634 },
1635 Round 7 : {
1636 B -> [*],
1637 C -> [*],
1638 D -> [*],
1639 },
1640 Round 8 : {
1641 B -> [*],
1642 C -> [*],
1643 D -> [*],
1644 },
1645 Round 9 : {
1646 B -> [*],
1647 C -> [*],
1648 D -> [*],
1649 },
1650 Round 10 : {
1651 B -> [*],
1652 C -> [*],
1653 D -> [*],
1654 },
1655 Round 11 : {
1656 B -> [*],
1657 C -> [*],
1658 D -> [*],
1659 },
1660 }";
1661
1662 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
1663 dag_builder.print();
1664
1665 let mut fixture = CoreTestFixture::new(
1666 context,
1667 vec![1, 1, 1, 1],
1668 AuthorityIndex::new_for_test(0),
1669 true,
1670 )
1671 .await;
1672
1673 let last_commit = fixture.store.read_last_commit().unwrap();
1675 assert!(last_commit.is_none());
1676 assert_eq!(fixture.dag_state.read().last_commit_index(), 0);
1677
1678 fixture.core.set_last_known_proposed_round(4);
1681
1682 let mut all_blocks = dag_builder.blocks(1..=11);
1688 all_blocks.sort_by_key(|b| b.round());
1689 fixture
1692 .transaction_vote_tracker
1693 .add_voted_blocks(all_blocks.iter().map(|b| (b.clone(), vec![])).collect());
1694 let blocks: Vec<VerifiedBlock> = all_blocks
1695 .into_iter()
1696 .filter(|b| !(b.round() == 1 && b.author() == AuthorityIndex::new_for_test(3)))
1697 .collect();
1698 fixture.core.add_blocks(blocks).expect("Should not fail");
1699
1700 assert_eq!(fixture.core.last_proposed_round(), Some(12));
1701 }
1702
1703 #[tokio::test]
1704 async fn test_core_set_min_propose_round() {
1705 telemetry_subscribers::init_for_testing();
1706 let (context, _) = Context::new_for_test(4);
1707 let context = context.with_parameters(Parameters {
1708 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
1709 ..Default::default()
1710 });
1711 let mut fixture = CoreTestFixture::new(
1712 context,
1713 vec![1, 1, 1, 1],
1714 AuthorityIndex::new_for_test(0),
1715 true,
1716 )
1717 .await;
1718
1719 assert_eq!(
1721 fixture.core.last_proposed_round(),
1722 Some(GENESIS_ROUND),
1723 "No block should have been created other than genesis"
1724 );
1725
1726 assert!(fixture.core.try_propose(true).unwrap().is_none());
1728
1729 let mut builder = DagBuilder::new(fixture.core.context.clone());
1731 builder.layers(1..=10).build();
1732
1733 let blocks = builder.blocks.values().cloned().collect::<Vec<_>>();
1734
1735 assert!(fixture.add_blocks(blocks).unwrap().is_empty());
1737
1738 fixture
1739 .core
1740 .round_tracker_for_tests()
1741 .write()
1742 .update_from_probe(
1743 vec![
1744 vec![10, 10, 10, 10],
1745 vec![10, 10, 10, 10],
1746 vec![10, 10, 10, 10],
1747 vec![10, 10, 10, 10],
1748 ],
1749 vec![
1750 vec![10, 10, 10, 10],
1751 vec![10, 10, 10, 10],
1752 vec![10, 10, 10, 10],
1753 vec![10, 10, 10, 10],
1754 ],
1755 );
1756
1757 assert!(fixture.core.try_propose(true).unwrap().is_none());
1759
1760 fixture.core.set_last_known_proposed_round(10);
1763
1764 let block = fixture.core.try_propose(true).expect("No error").unwrap();
1765 assert_eq!(block.round(), 11);
1766 assert_eq!(block.ancestors().len(), 4);
1767
1768 let our_ancestor_included = block.ancestors()[0];
1769 assert_eq!(our_ancestor_included.author, fixture.core.context.own_index);
1770 assert_eq!(our_ancestor_included.round, 10);
1771 }
1772
1773 #[tokio::test(flavor = "current_thread", start_paused = true)]
1774 async fn test_core_try_new_block_leader_timeout() {
1775 telemetry_subscribers::init_for_testing();
1776
1777 async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
1784 let now = context.clock.timestamp_utc_ms();
1786 let max_timestamp = blocks
1787 .iter()
1788 .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
1789 .map(|block| block.timestamp_ms())
1790 .unwrap_or(0);
1791
1792 let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
1793 sleep(wait_time).await;
1794 }
1795
1796 let (context, _) = Context::new_for_test(4);
1797 let mut all_cores = create_cores(context, vec![1, 1, 1, 1]).await;
1799
1800 let (_last_core, cores) = all_cores.split_last_mut().unwrap();
1805
1806 let mut last_round_blocks = Vec::<VerifiedBlock>::new();
1808 for round in 1..=3 {
1809 let mut this_round_blocks = Vec::new();
1810
1811 for core_fixture in cores.iter_mut() {
1812 wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
1813
1814 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
1815
1816 if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
1818 assert_eq!(round - 1, r);
1819 if core_fixture.core.last_proposed_round() == Some(r) {
1820 core_fixture
1822 .core
1823 .try_propose(true)
1824 .unwrap()
1825 .unwrap_or_else(|| {
1826 panic!("Block should have been proposed for round {}", round)
1827 });
1828 }
1829 }
1830
1831 assert_eq!(core_fixture.core.last_proposed_round(), Some(round));
1832
1833 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
1834 }
1835
1836 last_round_blocks = this_round_blocks;
1837 }
1838
1839 for core_fixture in cores.iter_mut() {
1842 wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
1843
1844 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
1845 assert!(core_fixture.core.try_propose(false).unwrap().is_none());
1846 }
1847
1848 for core_fixture in cores.iter_mut() {
1851 assert!(core_fixture.core.new_block(4, true).unwrap().is_some());
1852 assert_eq!(core_fixture.core.last_proposed_round(), Some(4));
1853
1854 core_fixture.dag_state.write().flush();
1856
1857 let last_commit = core_fixture
1859 .store
1860 .read_last_commit()
1861 .unwrap()
1862 .expect("last commit should be set");
1863 assert_eq!(last_commit.index(), 1);
1866 let all_stored_commits = core_fixture
1867 .store
1868 .scan_commits((0..=CommitIndex::MAX).into())
1869 .unwrap();
1870 assert_eq!(all_stored_commits.len(), 1);
1871 }
1872 }
1873
1874 #[tokio::test(flavor = "current_thread", start_paused = true)]
1875 async fn test_core_try_new_block_with_leader_timeout_and_low_scoring_authority() {
1876 telemetry_subscribers::init_for_testing();
1877
1878 async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
1885 let now = context.clock.timestamp_utc_ms();
1887 let max_timestamp = blocks
1888 .iter()
1889 .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
1890 .map(|block| block.timestamp_ms())
1891 .unwrap_or(0);
1892
1893 let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
1894 sleep(wait_time).await;
1895 }
1896
1897 let (mut context, _) = Context::new_for_test(5);
1898 context
1899 .protocol_config
1900 .set_bad_nodes_stake_threshold_for_testing(33);
1901
1902 let mut all_cores = create_cores(context, vec![1, 1, 1, 1, 1]).await;
1904 let (_last_core, cores) = all_cores.split_last_mut().unwrap();
1905
1906 let mut last_round_blocks = Vec::<VerifiedBlock>::new();
1908 for round in 1..=30 {
1909 let mut this_round_blocks = Vec::new();
1910
1911 for core_fixture in cores.iter_mut() {
1912 wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
1913
1914 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
1915
1916 core_fixture
1917 .core
1918 .round_tracker_for_tests()
1919 .write()
1920 .update_from_probe(
1921 vec![
1922 vec![round, round, round, round, 0],
1923 vec![round, round, round, round, 0],
1924 vec![round, round, round, round, 0],
1925 vec![round, round, round, round, 0],
1926 vec![0, 0, 0, 0, 0],
1927 ],
1928 vec![
1929 vec![round, round, round, round, 0],
1930 vec![round, round, round, round, 0],
1931 vec![round, round, round, round, 0],
1932 vec![round, round, round, round, 0],
1933 vec![0, 0, 0, 0, 0],
1934 ],
1935 );
1936
1937 if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
1939 assert_eq!(round - 1, r);
1940 if core_fixture.core.last_proposed_round() == Some(r) {
1941 core_fixture
1943 .core
1944 .try_propose(true)
1945 .unwrap()
1946 .unwrap_or_else(|| {
1947 panic!("Block should have been proposed for round {}", round)
1948 });
1949 }
1950 }
1951
1952 assert_eq!(core_fixture.core.last_proposed_round(), Some(round));
1953
1954 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
1955 }
1956
1957 last_round_blocks = this_round_blocks;
1958 }
1959
1960 for round in 31..=40 {
1962 let mut this_round_blocks = Vec::new();
1963
1964 for core_fixture in all_cores.iter_mut() {
1965 wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
1966
1967 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
1968
1969 core_fixture
1972 .core
1973 .round_tracker_for_tests()
1974 .write()
1975 .update_from_probe(
1976 vec![
1977 vec![round, round, round, round, 0],
1978 vec![round, round, round, round, 0],
1979 vec![round, round, round, round, 0],
1980 vec![round, round, round, round, 0],
1981 vec![0, 0, 0, 0, 0],
1982 ],
1983 vec![
1984 vec![round, round, round, round, 0],
1985 vec![round, round, round, round, 0],
1986 vec![round, round, round, round, 0],
1987 vec![round, round, round, round, 0],
1988 vec![0, 0, 0, 0, 0],
1989 ],
1990 );
1991
1992 if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
1994 assert_eq!(round - 1, r);
1995 if core_fixture.core.last_proposed_round() == Some(r) {
1996 core_fixture
1998 .core
1999 .try_propose(true)
2000 .unwrap()
2001 .unwrap_or_else(|| {
2002 panic!("Block should have been proposed for round {}", round)
2003 });
2004 }
2005 }
2006
2007 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2008
2009 for block in this_round_blocks.iter() {
2010 if block.author() != AuthorityIndex::new_for_test(4) {
2011 assert_eq!(block.ancestors().len(), 4);
2014 } else {
2015 assert_eq!(block.ancestors().len(), 5);
2018 }
2019 }
2020 }
2021
2022 last_round_blocks = this_round_blocks;
2023 }
2024 }
2025
2026 #[tokio::test]
2027 async fn test_smart_ancestor_selection() {
2028 telemetry_subscribers::init_for_testing();
2029 let (context, _) = Context::new_for_test(7);
2030 let context = context.with_parameters(Parameters {
2031 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2032 ..Default::default()
2033 });
2034 let mut fixture =
2035 CoreTestFixture::new(context, vec![1; 7], AuthorityIndex::new_for_test(0), true).await;
2036 let min_round_delay = fixture.core.context.parameters.min_round_delay;
2037
2038 assert_eq!(
2040 fixture.core.last_proposed_round(),
2041 Some(GENESIS_ROUND),
2042 "No block should have been created other than genesis"
2043 );
2044
2045 assert!(fixture.core.try_propose(true).unwrap().is_none());
2047
2048 let mut builder = DagBuilder::new(fixture.core.context.clone());
2050 builder
2051 .layers(1..=12)
2052 .authorities(vec![AuthorityIndex::new_for_test(1)])
2053 .skip_block()
2054 .build();
2055 let blocks = builder.blocks(1..=12);
2056 assert!(fixture.add_blocks(blocks).unwrap().is_empty());
2058 fixture.core.set_last_known_proposed_round(12);
2059
2060 fixture
2061 .core
2062 .round_tracker_for_tests()
2063 .write()
2064 .update_from_probe(
2065 vec![
2066 vec![12, 12, 12, 12, 12, 12, 12],
2067 vec![0, 0, 0, 0, 0, 0, 0],
2068 vec![12, 12, 12, 12, 12, 12, 12],
2069 vec![12, 12, 12, 12, 12, 12, 12],
2070 vec![12, 12, 12, 12, 12, 12, 12],
2071 vec![12, 12, 12, 12, 12, 12, 12],
2072 vec![12, 12, 12, 12, 12, 12, 12],
2073 ],
2074 vec![
2075 vec![12, 12, 12, 12, 12, 12, 12],
2076 vec![0, 0, 0, 0, 0, 0, 0],
2077 vec![12, 12, 12, 12, 12, 12, 12],
2078 vec![12, 12, 12, 12, 12, 12, 12],
2079 vec![12, 12, 12, 12, 12, 12, 12],
2080 vec![12, 12, 12, 12, 12, 12, 12],
2081 vec![12, 12, 12, 12, 12, 12, 12],
2082 ],
2083 );
2084
2085 let block = fixture.core.try_propose(true).expect("No error").unwrap();
2086 assert_eq!(block.round(), 13);
2087 assert_eq!(block.ancestors().len(), 7);
2088
2089 builder
2091 .layers(13..=14)
2092 .authorities(vec![AuthorityIndex::new_for_test(0)])
2093 .skip_block()
2094 .build();
2095 let blocks = builder.blocks(13..=14);
2096 assert!(fixture.add_blocks(blocks).unwrap().is_empty());
2097
2098 let block = fixture.core.try_propose(true).expect("No error").unwrap();
2101 assert_eq!(block.round(), 15);
2102 assert_eq!(block.ancestors().len(), 6);
2103
2104 let round_14_ancestors = builder.last_ancestors.clone();
2107 builder
2108 .layer(15)
2109 .authorities(vec![
2110 AuthorityIndex::new_for_test(0),
2111 AuthorityIndex::new_for_test(5),
2112 AuthorityIndex::new_for_test(6),
2113 ])
2114 .skip_block()
2115 .build();
2116 let blocks = builder.blocks(15..=15);
2117 let authority_1_excluded_block_reference = blocks
2118 .iter()
2119 .find(|block| block.author() == AuthorityIndex::new_for_test(1))
2120 .unwrap()
2121 .reference();
2122 sleep(min_round_delay).await;
2124 assert!(fixture.add_blocks(blocks).unwrap().is_empty());
2126 assert_eq!(fixture.core.last_proposed_block().round(), 15);
2127
2128 builder
2129 .layer(15)
2130 .authorities(vec![
2131 AuthorityIndex::new_for_test(0),
2132 AuthorityIndex::new_for_test(1),
2133 AuthorityIndex::new_for_test(2),
2134 AuthorityIndex::new_for_test(3),
2135 AuthorityIndex::new_for_test(4),
2136 ])
2137 .skip_block()
2138 .override_last_ancestors(round_14_ancestors)
2139 .build();
2140 let blocks = builder.blocks(15..=15);
2141 let round_15_ancestors: Vec<BlockRef> = blocks
2142 .iter()
2143 .filter(|block| block.round() == 15)
2144 .map(|block| block.reference())
2145 .collect();
2146 let included_block_references = iter::once(&fixture.core.last_proposed_block())
2147 .chain(blocks.iter())
2148 .filter(|block| block.author() != AuthorityIndex::new_for_test(1))
2149 .map(|block| block.reference())
2150 .collect::<Vec<_>>();
2151
2152 assert!(fixture.add_blocks(blocks).unwrap().is_empty());
2154 assert_eq!(fixture.core.last_proposed_block().round(), 16);
2155
2156 let extended_block = loop {
2158 let extended_block =
2159 tokio::time::timeout(Duration::from_secs(1), fixture.block_receiver.recv())
2160 .await
2161 .unwrap()
2162 .unwrap();
2163 if extended_block.block.round() == 16 {
2164 break extended_block;
2165 }
2166 };
2167 assert_eq!(extended_block.block.round(), 16);
2168 assert_eq!(
2169 extended_block.block.author(),
2170 fixture.core.context.own_index
2171 );
2172 assert_eq!(extended_block.block.ancestors().len(), 6);
2173 assert_eq!(extended_block.block.ancestors(), included_block_references);
2174 assert_eq!(extended_block.excluded_ancestors.len(), 1);
2175 assert_eq!(
2176 extended_block.excluded_ancestors[0],
2177 authority_1_excluded_block_reference
2178 );
2179
2180 builder
2185 .layer(16)
2186 .authorities(vec![
2187 AuthorityIndex::new_for_test(0),
2188 AuthorityIndex::new_for_test(5),
2189 AuthorityIndex::new_for_test(6),
2190 ])
2191 .skip_block()
2192 .override_last_ancestors(round_15_ancestors)
2193 .build();
2194 let blocks = builder.blocks(16..=16);
2195 sleep(min_round_delay).await;
2197 assert!(fixture.add_blocks(blocks).unwrap().is_empty());
2199 assert_eq!(fixture.core.last_proposed_block().round(), 16);
2200
2201 let block = fixture.core.try_propose(true).expect("No error").unwrap();
2204 assert_eq!(block.round(), 17);
2205 assert_eq!(block.ancestors().len(), 5);
2206
2207 let extended_block =
2209 tokio::time::timeout(Duration::from_secs(1), fixture.block_receiver.recv())
2210 .await
2211 .unwrap()
2212 .unwrap();
2213 assert_eq!(extended_block.block.round(), 17);
2214 assert_eq!(
2215 extended_block.block.author(),
2216 fixture.core.context.own_index
2217 );
2218 assert_eq!(extended_block.block.ancestors().len(), 5);
2219 assert_eq!(extended_block.excluded_ancestors.len(), 0);
2220
2221 builder
2224 .layers(17..=22)
2225 .authorities(vec![AuthorityIndex::new_for_test(0)])
2226 .skip_block()
2227 .build();
2228 let blocks = builder.blocks(17..=22);
2229
2230 fixture
2236 .core
2237 .round_tracker_for_tests()
2238 .write()
2239 .update_from_probe(
2240 vec![
2241 vec![22, 22, 22, 22, 22, 22, 22],
2242 vec![22, 22, 22, 22, 22, 22, 22],
2243 vec![22, 22, 22, 22, 22, 22, 22],
2244 vec![22, 22, 22, 22, 22, 22, 22],
2245 vec![22, 22, 22, 22, 22, 22, 22],
2246 vec![22, 22, 22, 22, 22, 22, 22],
2247 vec![22, 22, 22, 22, 22, 22, 22],
2248 ],
2249 vec![
2250 vec![22, 22, 22, 22, 22, 22, 22],
2251 vec![22, 22, 22, 22, 22, 22, 22],
2252 vec![22, 22, 22, 22, 22, 22, 22],
2253 vec![22, 22, 22, 22, 22, 22, 22],
2254 vec![22, 22, 22, 22, 22, 22, 22],
2255 vec![22, 22, 22, 22, 22, 22, 22],
2256 vec![22, 22, 22, 22, 22, 22, 22],
2257 ],
2258 );
2259
2260 let own_index = fixture.core.context.own_index;
2261 let included_block_references = iter::once(&fixture.core.last_proposed_block())
2262 .chain(blocks.iter())
2263 .filter(|block| block.round() == 22 || block.author() == own_index)
2264 .map(|block| block.reference())
2265 .collect::<Vec<_>>();
2266
2267 sleep(min_round_delay).await;
2269 assert!(fixture.add_blocks(blocks).unwrap().is_empty());
2270 assert_eq!(fixture.core.last_proposed_block().round(), 23);
2271
2272 let extended_block =
2274 tokio::time::timeout(Duration::from_secs(1), fixture.block_receiver.recv())
2275 .await
2276 .unwrap()
2277 .unwrap();
2278 assert_eq!(extended_block.block.round(), 23);
2279 assert_eq!(
2280 extended_block.block.author(),
2281 fixture.core.context.own_index
2282 );
2283 assert_eq!(extended_block.block.ancestors().len(), 7);
2284 assert_eq!(extended_block.block.ancestors(), included_block_references);
2285 assert_eq!(extended_block.excluded_ancestors.len(), 0);
2286 }
2287
2288 #[tokio::test]
2289 async fn test_excluded_ancestor_limit() {
2290 telemetry_subscribers::init_for_testing();
2291 let (context, _) = Context::new_for_test(4);
2292 let context = context.with_parameters(Parameters {
2293 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2294 ..Default::default()
2295 });
2296 let mut fixture = CoreTestFixture::new(
2297 context,
2298 vec![1, 1, 1, 1],
2299 AuthorityIndex::new_for_test(0),
2300 true,
2301 )
2302 .await;
2303
2304 assert_eq!(
2306 fixture.core.last_proposed_round(),
2307 Some(GENESIS_ROUND),
2308 "No block should have been created other than genesis"
2309 );
2310
2311 let mut builder = DagBuilder::new(fixture.core.context.clone());
2313 builder.layers(1..=3).build();
2314
2315 builder
2319 .layer(4)
2320 .authorities(vec![AuthorityIndex::new_for_test(1)])
2321 .equivocate(9)
2322 .build();
2323 let blocks = builder.blocks(1..=4);
2324
2325 assert!(fixture.add_blocks(blocks).unwrap().is_empty());
2327 fixture.core.set_last_known_proposed_round(3);
2328
2329 let block = fixture.core.try_propose(true).expect("No error").unwrap();
2330 assert_eq!(block.round(), 5);
2331 assert_eq!(block.ancestors().len(), 4);
2332
2333 let extended_block =
2335 tokio::time::timeout(Duration::from_secs(1), fixture.block_receiver.recv())
2336 .await
2337 .unwrap()
2338 .unwrap();
2339 assert_eq!(extended_block.block.round(), 5);
2340 assert_eq!(
2341 extended_block.block.author(),
2342 fixture.core.context.own_index
2343 );
2344 assert_eq!(extended_block.block.ancestors().len(), 4);
2345 assert_eq!(extended_block.excluded_ancestors.len(), 8);
2346 }
2347
2348 #[tokio::test]
2349 async fn test_core_set_propagation_delay_per_authority() {
2350 telemetry_subscribers::init_for_testing();
2351 let (context, _) = Context::new_for_test(4);
2352 let mut fixture = CoreTestFixture::new(
2353 context,
2354 vec![1, 1, 1, 1],
2355 AuthorityIndex::new_for_test(0),
2356 false,
2357 )
2358 .await;
2359
2360 let test_block = VerifiedBlock::new_for_test(TestBlock::new(1000, 0).build());
2365 fixture
2366 .transaction_vote_tracker
2367 .add_voted_blocks(vec![(test_block.clone(), vec![])]);
2368 fixture.dag_state.write().accept_block(test_block);
2370
2371 fixture
2372 .core
2373 .round_tracker_for_tests()
2374 .write()
2375 .update_from_probe(
2376 vec![
2377 vec![0, 0, 0, 0],
2378 vec![0, 0, 0, 0],
2379 vec![0, 0, 0, 0],
2380 vec![0, 0, 0, 0],
2381 ],
2382 vec![
2383 vec![0, 0, 0, 0],
2384 vec![0, 0, 0, 0],
2385 vec![0, 0, 0, 0],
2386 vec![0, 0, 0, 0],
2387 ],
2388 );
2389
2390 assert!(fixture.core.try_propose(true).unwrap().is_none());
2392
2393 fixture
2397 .core
2398 .round_tracker_for_tests()
2399 .write()
2400 .update_from_probe(
2401 vec![
2402 vec![1000, 1000, 1000, 1000],
2403 vec![1000, 1000, 1000, 1000],
2404 vec![1000, 1000, 1000, 1000],
2405 vec![1000, 1000, 1000, 1000],
2406 ],
2407 vec![
2408 vec![1000, 1000, 1000, 1000],
2409 vec![1000, 1000, 1000, 1000],
2410 vec![1000, 1000, 1000, 1000],
2411 vec![1000, 1000, 1000, 1000],
2412 ],
2413 );
2414
2415 for author in 1..4 {
2418 let block = VerifiedBlock::new_for_test(TestBlock::new(1000, author).build());
2419 fixture
2420 .transaction_vote_tracker
2421 .add_voted_blocks(vec![(block.clone(), vec![])]);
2422 fixture.dag_state.write().accept_block(block);
2424 }
2425
2426 assert!(fixture.core.try_propose(true).unwrap().is_some());
2428 }
2429
2430 #[tokio::test(flavor = "current_thread", start_paused = true)]
2431 async fn test_leader_schedule_change() {
2432 telemetry_subscribers::init_for_testing();
2433 let default_params = Parameters::default();
2434
2435 let (context, _) = Context::new_for_test(4);
2436 let mut cores = create_cores(context, vec![1, 1, 1, 1]).await;
2438
2439 let mut last_round_blocks = Vec::new();
2441 for round in 1..=30 {
2442 let mut this_round_blocks = Vec::new();
2443
2444 sleep(default_params.min_round_delay).await;
2446
2447 for core_fixture in &mut cores {
2448 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
2451
2452 core_fixture
2453 .core
2454 .round_tracker_for_tests()
2455 .write()
2456 .update_from_probe(
2457 vec![
2458 vec![round, round, round, round],
2459 vec![round, round, round, round],
2460 vec![round, round, round, round],
2461 vec![round, round, round, round],
2462 ],
2463 vec![
2464 vec![round, round, round, round],
2465 vec![round, round, round, round],
2466 vec![round, round, round, round],
2467 vec![round, round, round, round],
2468 ],
2469 );
2470
2471 let new_round = receive(
2473 Duration::from_secs(1),
2474 core_fixture.signal_receivers.new_round_receiver(),
2475 )
2476 .await;
2477 assert_eq!(new_round, round);
2478
2479 let extended_block = tokio::time::timeout(
2481 Duration::from_secs(1),
2482 core_fixture.block_receiver.recv(),
2483 )
2484 .await
2485 .unwrap()
2486 .unwrap();
2487 assert_eq!(extended_block.block.round(), round);
2488 assert_eq!(
2489 extended_block.block.author(),
2490 core_fixture.core.context.own_index
2491 );
2492
2493 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2495
2496 let block = core_fixture.core.last_proposed_block();
2497
2498 assert_eq!(
2500 block.ancestors().len(),
2501 core_fixture.core.context.committee.size()
2502 );
2503 for ancestor in block.ancestors() {
2504 if block.round() > 1 {
2505 assert!(
2507 last_round_blocks
2508 .iter()
2509 .any(|block| block.reference() == *ancestor),
2510 "Reference from previous round should be added"
2511 );
2512 }
2513 }
2514 }
2515
2516 last_round_blocks = this_round_blocks;
2517 }
2518
2519 for core_fixture in cores {
2520 core_fixture.dag_state.write().flush();
2522
2523 let last_commit = core_fixture
2525 .store
2526 .read_last_commit()
2527 .unwrap()
2528 .expect("last commit should be set");
2529 assert_eq!(last_commit.index(), 27);
2533 let all_stored_commits = core_fixture
2534 .store
2535 .scan_commits((0..=CommitIndex::MAX).into())
2536 .unwrap();
2537 assert_eq!(all_stored_commits.len(), 27);
2538 assert_eq!(
2539 core_fixture
2540 .core
2541 .leader_schedule
2542 .leader_swap_table
2543 .read()
2544 .bad_nodes
2545 .len(),
2546 1
2547 );
2548 assert_eq!(
2549 core_fixture
2550 .core
2551 .leader_schedule
2552 .leader_swap_table
2553 .read()
2554 .good_nodes
2555 .len(),
2556 1
2557 );
2558 let expected_reputation_scores =
2559 ReputationScores::new((11..=20).into(), vec![29, 29, 29, 29]);
2560 assert_eq!(
2561 core_fixture
2562 .core
2563 .leader_schedule
2564 .leader_swap_table
2565 .read()
2566 .reputation_scores,
2567 expected_reputation_scores
2568 );
2569 }
2570 }
2571
2572 #[tokio::test]
2573 async fn test_filter_new_commits() {
2574 telemetry_subscribers::init_for_testing();
2575
2576 let (context, _key_pairs) = Context::new_for_test(4);
2577 let context = context.with_parameters(Parameters {
2578 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2579 ..Default::default()
2580 });
2581
2582 let authority_index = AuthorityIndex::new_for_test(0);
2583 let core = CoreTestFixture::new(context, vec![1, 1, 1, 1], authority_index, true).await;
2584 let mut core = core.core;
2585
2586 assert_eq!(
2588 core.last_proposed_round(),
2589 Some(GENESIS_ROUND),
2590 "No block should have been created other than genesis"
2591 );
2592
2593 let mut dag_builder = DagBuilder::new(core.context.clone());
2595 dag_builder.layers(1..=12).build();
2596
2597 dag_builder.print();
2599 let blocks = dag_builder.blocks(1..=6);
2600 core.dag_state.write().accept_blocks(blocks);
2601
2602 let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
2604
2605 let committed_sub_dags = core.try_commit(vec![]).unwrap();
2607
2608 assert_eq!(committed_sub_dags.len(), 4);
2610
2611 println!("Case 1. Provide certified commits that are all before the last committed round.");
2613
2614 let certified_commits = sub_dags_and_commits
2616 .iter()
2617 .take(4)
2618 .map(|(_, c)| c)
2619 .cloned()
2620 .collect::<Vec<_>>();
2621 assert!(
2622 certified_commits.last().unwrap().index()
2623 <= committed_sub_dags.last().unwrap().commit_ref.index,
2624 "Highest certified commit should older than the highest committed index."
2625 );
2626
2627 let certified_commits = core.filter_new_commits(certified_commits).unwrap();
2628
2629 assert!(certified_commits.is_empty());
2631
2632 println!("Case 2. Provide certified commits that are all after the last committed round.");
2633
2634 let certified_commits = sub_dags_and_commits
2636 .iter()
2637 .take(5)
2638 .map(|(_, c)| c.clone())
2639 .collect::<Vec<_>>();
2640
2641 let certified_commits = core.filter_new_commits(certified_commits.clone()).unwrap();
2642
2643 assert_eq!(certified_commits.len(), 1);
2645 assert_eq!(certified_commits.first().unwrap().reference().index, 5);
2646
2647 println!(
2648 "Case 3. Provide certified commits where the first certified commit index is not the last_commited_index + 1."
2649 );
2650
2651 let certified_commits = sub_dags_and_commits
2653 .iter()
2654 .skip(5)
2655 .take(1)
2656 .map(|(_, c)| c.clone())
2657 .collect::<Vec<_>>();
2658
2659 let err = core
2660 .filter_new_commits(certified_commits.clone())
2661 .unwrap_err();
2662 match err {
2663 ConsensusError::UnexpectedCertifiedCommitIndex {
2664 expected_commit_index: 5,
2665 commit_index: 6,
2666 } => (),
2667 _ => panic!("Unexpected error: {:?}", err),
2668 }
2669 }
2670
2671 #[tokio::test]
2672 async fn test_add_certified_commits() {
2673 telemetry_subscribers::init_for_testing();
2674
2675 let (context, _key_pairs) = Context::new_for_test(4);
2676 let context = context.with_parameters(Parameters {
2677 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2678 ..Default::default()
2679 });
2680
2681 let authority_index = AuthorityIndex::new_for_test(0);
2682 let core = CoreTestFixture::new(context, vec![1, 1, 1, 1], authority_index, true).await;
2683 let store = core.store.clone();
2684 let mut core = core.core;
2685
2686 assert_eq!(
2688 core.last_proposed_round(),
2689 Some(GENESIS_ROUND),
2690 "No block should have been created other than genesis"
2691 );
2692
2693 let mut dag_builder = DagBuilder::new(core.context.clone());
2695 dag_builder.layers(1..=12).build();
2696
2697 dag_builder.print();
2699 let blocks = dag_builder.blocks(1..=6);
2700 core.dag_state.write().accept_blocks(blocks);
2701
2702 let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
2704
2705 let committed_sub_dags = core.try_commit(vec![]).unwrap();
2707
2708 assert_eq!(committed_sub_dags.len(), 4);
2710
2711 core.dag_state.write().flush();
2713
2714 {
2715 println!("Case 1. Provide no certified commits. No commit should happen.");
2716 let committed_sub_dags = core.try_commit(vec![]).unwrap();
2718 assert!(committed_sub_dags.is_empty());
2719 let last_commit = store
2720 .read_last_commit()
2721 .unwrap()
2722 .expect("Last commit should be set");
2723 assert_eq!(last_commit.reference().index, 4);
2724 }
2725
2726 println!(
2727 "Case 2. Provide certified commits that before and after the last committed round and also there are additional blocks so can run the direct decide rule as well."
2728 );
2729
2730 let certified_commits = sub_dags_and_commits
2732 .iter()
2733 .skip(3)
2734 .take(5)
2735 .map(|(_, c)| c.clone())
2736 .collect::<Vec<_>>();
2737
2738 let blocks = dag_builder.blocks(7..=12);
2740 core.dag_state.write().accept_blocks(blocks);
2741
2742 core.add_certified_commits(CertifiedCommits::new(certified_commits.clone(), vec![]))
2744 .expect("Should not fail");
2745
2746 core.dag_state.write().flush();
2748
2749 let commits = store.scan_commits((6..=10).into()).unwrap();
2750
2751 assert_eq!(commits.len(), 5);
2753
2754 for i in 6..=10 {
2755 let commit = &commits[i - 6];
2756 assert_eq!(commit.reference().index, i as u32);
2757 }
2758 }
2759
2760 #[tokio::test]
2761 async fn try_commit_with_certified_commits_gced_blocks() {
2762 const GC_DEPTH: u32 = 3;
2763 telemetry_subscribers::init_for_testing();
2764
2765 let (mut context, mut key_pairs) = Context::new_for_test(5);
2766 context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
2767 let context = Arc::new(context.with_parameters(Parameters {
2768 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2769 ..Default::default()
2770 }));
2771
2772 let store = Arc::new(MemStore::new());
2773 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2774
2775 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2776 let leader_schedule = Arc::new(
2777 LeaderSchedule::from_store(context.clone(), dag_state.clone())
2778 .with_num_commits_per_schedule(10),
2779 );
2780
2781 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2782 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2783 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2784 let transaction_vote_tracker = TransactionVoteTracker::new(
2785 context.clone(),
2786 Arc::new(NoopBlockVerifier {}),
2787 dag_state.clone(),
2788 );
2789 let _block_receiver = signal_receivers.block_broadcast_receiver();
2791
2792 let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
2793 let commit_observer = CommitObserver::new(
2794 context.clone(),
2795 commit_consumer,
2796 dag_state.clone(),
2797 transaction_vote_tracker.clone(),
2798 )
2799 .await;
2800
2801 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
2802 let mut core = Core::new_validator(
2803 context.clone(),
2804 leader_schedule,
2805 transaction_consumer,
2806 transaction_vote_tracker.clone(),
2807 block_manager,
2808 commit_observer,
2809 signals,
2810 key_pairs.remove(context.own_index.value()).1,
2811 dag_state.clone(),
2812 true,
2813 round_tracker,
2814 );
2815
2816 assert_eq!(
2818 core.last_proposed_round(),
2819 Some(GENESIS_ROUND),
2820 "No block should have been created other than genesis"
2821 );
2822
2823 let dag_str = "DAG {
2824 Round 0 : { 5 },
2825 Round 1 : { * },
2826 Round 2 : {
2827 A -> [-E1],
2828 B -> [-E1],
2829 C -> [-E1],
2830 D -> [-E1],
2831 },
2832 Round 3 : {
2833 A -> [*],
2834 B -> [*],
2835 C -> [*],
2836 D -> [*],
2837 },
2838 Round 4 : {
2839 A -> [*],
2840 B -> [*],
2841 C -> [*],
2842 D -> [*],
2843 },
2844 Round 5 : {
2845 A -> [*],
2846 B -> [*],
2847 C -> [*],
2848 D -> [*],
2849 E -> [A4, B4, C4, D4, E1]
2850 },
2851 Round 6 : { * },
2852 Round 7 : { * },
2853 }";
2854
2855 let (_, mut dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2856 dag_builder.print();
2857
2858 let (_sub_dags, certified_commits): (Vec<_>, Vec<_>) = dag_builder
2860 .get_sub_dag_and_certified_commits(1..=5)
2861 .into_iter()
2862 .unzip();
2863
2864 let committed_sub_dags = core.try_commit(certified_commits).unwrap();
2867
2868 assert_eq!(committed_sub_dags.len(), 4);
2870 for (index, committed_sub_dag) in committed_sub_dags.iter().enumerate() {
2871 assert_eq!(committed_sub_dag.commit_ref.index as usize, index + 1);
2872
2873 for block in committed_sub_dag.blocks.iter() {
2875 if block.round() == 1 && block.author() == AuthorityIndex::new_for_test(5) {
2876 panic!("Did not expect to commit block E1");
2877 }
2878 }
2879 }
2880 }
2881
2882 #[tokio::test(flavor = "current_thread", start_paused = true)]
2883 async fn test_commit_on_leader_schedule_change_boundary_without_multileader() {
2884 parameterized_test_commit_on_leader_schedule_change_boundary(Some(1)).await;
2885 }
2886
2887 #[tokio::test(flavor = "current_thread", start_paused = true)]
2888 async fn test_commit_on_leader_schedule_change_boundary_with_multileader() {
2889 parameterized_test_commit_on_leader_schedule_change_boundary(None).await;
2890 }
2891
2892 async fn parameterized_test_commit_on_leader_schedule_change_boundary(
2893 num_leaders_per_round: Option<usize>,
2894 ) {
2895 telemetry_subscribers::init_for_testing();
2896 let default_params = Parameters::default();
2897
2898 let (mut context, _) = Context::new_for_test(6);
2899 context
2900 .protocol_config
2901 .set_num_leaders_per_round_for_testing(num_leaders_per_round);
2902 let mut cores = create_cores(context, vec![1, 1, 1, 1, 1, 1]).await;
2904
2905 let mut last_round_blocks: Vec<VerifiedBlock> = Vec::new();
2907 for round in 1..=33 {
2908 let mut this_round_blocks = Vec::new();
2909
2910 sleep(default_params.min_round_delay).await;
2912
2913 for core_fixture in &mut cores {
2914 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
2917
2918 core_fixture
2919 .core
2920 .round_tracker_for_tests()
2921 .write()
2922 .update_from_probe(
2923 vec![
2924 vec![round, round, round, round, round, round],
2925 vec![round, round, round, round, round, round],
2926 vec![round, round, round, round, round, round],
2927 vec![round, round, round, round, round, round],
2928 vec![round, round, round, round, round, round],
2929 vec![round, round, round, round, round, round],
2930 ],
2931 vec![
2932 vec![round, round, round, round, round, round],
2933 vec![round, round, round, round, round, round],
2934 vec![round, round, round, round, round, round],
2935 vec![round, round, round, round, round, round],
2936 vec![round, round, round, round, round, round],
2937 vec![round, round, round, round, round, round],
2938 ],
2939 );
2940
2941 let new_round = receive(
2943 Duration::from_secs(1),
2944 core_fixture.signal_receivers.new_round_receiver(),
2945 )
2946 .await;
2947 assert_eq!(new_round, round);
2948
2949 let extended_block = tokio::time::timeout(
2951 Duration::from_secs(1),
2952 core_fixture.block_receiver.recv(),
2953 )
2954 .await
2955 .unwrap()
2956 .unwrap();
2957 assert_eq!(extended_block.block.round(), round);
2958 assert_eq!(
2959 extended_block.block.author(),
2960 core_fixture.core.context.own_index
2961 );
2962
2963 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2965
2966 let block = core_fixture.core.last_proposed_block();
2967
2968 assert_eq!(
2970 block.ancestors().len(),
2971 core_fixture.core.context.committee.size()
2972 );
2973 for ancestor in block.ancestors() {
2974 if block.round() > 1 {
2975 assert!(
2977 last_round_blocks
2978 .iter()
2979 .any(|block| block.reference() == *ancestor),
2980 "Reference from previous round should be added"
2981 );
2982 }
2983 }
2984 }
2985
2986 last_round_blocks = this_round_blocks;
2987 }
2988
2989 for core_fixture in cores {
2990 let expected_commit_count = match num_leaders_per_round {
3002 Some(1) => 30,
3003 _ => 31,
3004 };
3005
3006 core_fixture.dag_state.write().flush();
3008
3009 let last_commit = core_fixture
3011 .store
3012 .read_last_commit()
3013 .unwrap()
3014 .expect("last commit should be set");
3015 assert_eq!(last_commit.index(), expected_commit_count);
3016 let all_stored_commits = core_fixture
3017 .store
3018 .scan_commits((0..=CommitIndex::MAX).into())
3019 .unwrap();
3020 assert_eq!(all_stored_commits.len(), expected_commit_count as usize);
3021 assert_eq!(
3022 core_fixture
3023 .core
3024 .leader_schedule
3025 .leader_swap_table
3026 .read()
3027 .bad_nodes
3028 .len(),
3029 1
3030 );
3031 assert_eq!(
3032 core_fixture
3033 .core
3034 .leader_schedule
3035 .leader_swap_table
3036 .read()
3037 .good_nodes
3038 .len(),
3039 1
3040 );
3041 let expected_reputation_scores =
3042 ReputationScores::new((21..=30).into(), vec![43, 43, 43, 43, 43, 43]);
3043 assert_eq!(
3044 core_fixture
3045 .core
3046 .leader_schedule
3047 .leader_swap_table
3048 .read()
3049 .reputation_scores,
3050 expected_reputation_scores
3051 );
3052 }
3053 }
3054
3055 #[tokio::test]
3056 async fn test_core_signals() {
3057 telemetry_subscribers::init_for_testing();
3058 let default_params = Parameters::default();
3059
3060 let (context, _) = Context::new_for_test(4);
3061 let mut cores = create_cores(context, vec![1, 1, 1, 1]).await;
3063
3064 let mut last_round_blocks = Vec::new();
3066 for round in 1..=10 {
3067 let mut this_round_blocks = Vec::new();
3068
3069 sleep(default_params.min_round_delay).await;
3071
3072 for core_fixture in &mut cores {
3073 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
3076
3077 core_fixture
3078 .core
3079 .round_tracker_for_tests()
3080 .write()
3081 .update_from_probe(
3082 vec![
3083 vec![round, round, round, round],
3084 vec![round, round, round, round],
3085 vec![round, round, round, round],
3086 vec![round, round, round, round],
3087 ],
3088 vec![
3089 vec![round, round, round, round],
3090 vec![round, round, round, round],
3091 vec![round, round, round, round],
3092 vec![round, round, round, round],
3093 ],
3094 );
3095
3096 let new_round = receive(
3098 Duration::from_secs(1),
3099 core_fixture.signal_receivers.new_round_receiver(),
3100 )
3101 .await;
3102 assert_eq!(new_round, round);
3103
3104 let extended_block = tokio::time::timeout(
3106 Duration::from_secs(1),
3107 core_fixture.block_receiver.recv(),
3108 )
3109 .await
3110 .unwrap()
3111 .unwrap();
3112 assert_eq!(extended_block.block.round(), round);
3113 assert_eq!(
3114 extended_block.block.author(),
3115 core_fixture.core.context.own_index
3116 );
3117
3118 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3120
3121 let block = core_fixture.core.last_proposed_block();
3122
3123 assert_eq!(
3125 block.ancestors().len(),
3126 core_fixture.core.context.committee.size()
3127 );
3128 for ancestor in block.ancestors() {
3129 if block.round() > 1 {
3130 assert!(
3132 last_round_blocks
3133 .iter()
3134 .any(|block| block.reference() == *ancestor),
3135 "Reference from previous round should be added"
3136 );
3137 }
3138 }
3139 }
3140
3141 last_round_blocks = this_round_blocks;
3142 }
3143
3144 for core_fixture in cores {
3145 core_fixture.dag_state.write().flush();
3147 let last_commit = core_fixture
3149 .store
3150 .read_last_commit()
3151 .unwrap()
3152 .expect("last commit should be set");
3153 assert_eq!(last_commit.index(), 7);
3157 let all_stored_commits = core_fixture
3158 .store
3159 .scan_commits((0..=CommitIndex::MAX).into())
3160 .unwrap();
3161 assert_eq!(all_stored_commits.len(), 7);
3162 }
3163 }
3164
3165 #[tokio::test]
3166 async fn test_core_compress_proposal_references() {
3167 telemetry_subscribers::init_for_testing();
3168 let default_params = Parameters::default();
3169
3170 let (context, _) = Context::new_for_test(4);
3171 let mut cores = create_cores(context, vec![1, 1, 1, 1]).await;
3173
3174 let mut last_round_blocks = Vec::new();
3175 let mut all_blocks = Vec::new();
3176
3177 let excluded_authority = AuthorityIndex::new_for_test(3);
3178
3179 for round in 1..=10 {
3180 let mut this_round_blocks = Vec::new();
3181
3182 for core_fixture in &mut cores {
3183 if core_fixture.core.context.own_index == excluded_authority {
3185 continue;
3186 }
3187
3188 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
3190 core_fixture
3191 .core
3192 .round_tracker_for_tests()
3193 .write()
3194 .update_from_probe(
3195 vec![
3196 vec![round, round, round, round],
3197 vec![round, round, round, round],
3198 vec![round, round, round, round],
3199 vec![round, round, round, round],
3200 ],
3201 vec![
3202 vec![round, round, round, round],
3203 vec![round, round, round, round],
3204 vec![round, round, round, round],
3205 vec![round, round, round, round],
3206 ],
3207 );
3208 core_fixture.core.new_block(round, true).unwrap();
3209
3210 let block = core_fixture.core.last_proposed_block();
3211 assert_eq!(block.round(), round);
3212
3213 this_round_blocks.push(block.clone());
3215 }
3216
3217 last_round_blocks = this_round_blocks.clone();
3218 all_blocks.extend(this_round_blocks);
3219 }
3220
3221 let core_fixture = &mut cores[excluded_authority];
3225 sleep(default_params.min_round_delay).await;
3227 core_fixture.add_blocks(all_blocks).unwrap();
3229
3230 let block = core_fixture.core.last_proposed_block();
3233 assert_eq!(block.round(), 11);
3234 assert_eq!(block.ancestors().len(), 4);
3235 for block_ref in block.ancestors() {
3236 if block_ref.author == excluded_authority {
3237 assert_eq!(block_ref.round, 1);
3238 } else {
3239 assert_eq!(block_ref.round, 10);
3240 }
3241 }
3242
3243 core_fixture.dag_state.write().flush();
3245
3246 let last_commit = core_fixture
3248 .store
3249 .read_last_commit()
3250 .unwrap()
3251 .expect("last commit should be set");
3252 assert_eq!(last_commit.index(), 6);
3256 let all_stored_commits = core_fixture
3257 .store
3258 .scan_commits((0..=CommitIndex::MAX).into())
3259 .unwrap();
3260 assert_eq!(all_stored_commits.len(), 6);
3261 }
3262
3263 #[tokio::test]
3264 async fn try_select_certified_leaders() {
3265 telemetry_subscribers::init_for_testing();
3267
3268 let (context, _) = Context::new_for_test(4);
3269
3270 let authority_index = AuthorityIndex::new_for_test(0);
3271 let core =
3272 CoreTestFixture::new(context.clone(), vec![1, 1, 1, 1], authority_index, true).await;
3273 let mut core = core.core;
3274
3275 let mut dag_builder = DagBuilder::new(Arc::new(context.clone()));
3276 dag_builder.layers(1..=12).build();
3277
3278 let limit = 2;
3279
3280 let blocks = dag_builder.blocks(1..=12);
3281 core.dag_state.write().accept_blocks(blocks);
3282
3283 let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=4);
3285 let mut certified_commits = sub_dags_and_commits
3286 .into_iter()
3287 .map(|(_, commit)| commit)
3288 .collect::<Vec<_>>();
3289
3290 let leaders = core.try_select_certified_leaders(&mut certified_commits, limit);
3291
3292 assert_eq!(leaders.len(), 2);
3294 assert_eq!(certified_commits.len(), 2);
3295 }
3296
3297 pub(crate) async fn receive<T: Copy>(timeout: Duration, mut receiver: watch::Receiver<T>) -> T {
3298 tokio::time::timeout(timeout, receiver.changed())
3299 .await
3300 .expect("Timeout while waiting to read from receiver")
3301 .expect("Signal receive channel shouldn't be closed");
3302 *receiver.borrow_and_update()
3303 }
3304}