1use std::{
5 collections::{BTreeMap, BTreeSet},
6 iter,
7 sync::Arc,
8 time::Duration,
9 vec,
10};
11
12use consensus_config::{AuthorityIndex, ProtocolKeyPair};
13#[cfg(test)]
14use consensus_config::{Stake, local_committee_and_keys};
15use consensus_types::block::{BlockRef, BlockTimestampMs, Round};
16use itertools::Itertools as _;
17#[cfg(test)]
18use mysten_metrics::monitored_mpsc::UnboundedReceiver;
19use mysten_metrics::monitored_scope;
20use parking_lot::RwLock;
21use sui_macros::fail_point;
22use tokio::{
23 sync::{broadcast, watch},
24 time::Instant,
25};
26use tracing::{debug, info, trace, warn};
27
28#[cfg(test)]
29use crate::{
30 CommitConsumerArgs, TransactionClient, block_verifier::NoopBlockVerifier,
31 storage::mem_store::MemStore,
32};
33use crate::{
34 ancestor::{AncestorState, AncestorStateManager},
35 block::{
36 Block, BlockAPI, BlockV1, BlockV2, ExtendedBlock, GENESIS_ROUND, SignedBlock, Slot,
37 VerifiedBlock,
38 },
39 block_manager::BlockManager,
40 commit::{
41 CertifiedCommit, CertifiedCommits, CommitAPI, CommittedSubDag, DecidedLeader, Decision,
42 },
43 commit_observer::CommitObserver,
44 context::Context,
45 dag_state::DagState,
46 error::{ConsensusError, ConsensusResult},
47 leader_schedule::LeaderSchedule,
48 round_tracker::RoundTracker,
49 stake_aggregator::{QuorumThreshold, StakeAggregator},
50 transaction::TransactionConsumer,
51 transaction_certifier::TransactionCertifier,
52 universal_committer::{
53 UniversalCommitter, universal_committer_builder::UniversalCommitterBuilder,
54 },
55};
56
57const MAX_COMMIT_VOTES_PER_BLOCK: usize = 100;
60
61pub(crate) struct Core {
62 context: Arc<Context>,
63 transaction_consumer: TransactionConsumer,
65 transaction_certifier: TransactionCertifier,
67 block_manager: BlockManager,
70 propagation_delay: Round,
77 committer: UniversalCommitter,
79 last_signaled_round: Round,
81 last_included_ancestors: Vec<Option<BlockRef>>,
85 last_decided_leader: Slot,
90 leader_schedule: Arc<LeaderSchedule>,
93 commit_observer: CommitObserver,
96 signals: CoreSignals,
98 block_signer: ProtocolKeyPair,
100 dag_state: Arc<RwLock<DagState>>,
102 last_known_proposed_round: Option<Round>,
106 ancestor_state_manager: AncestorStateManager,
111 round_tracker: Arc<RwLock<RoundTracker>>,
116}
117
118impl Core {
119 pub(crate) fn new(
120 context: Arc<Context>,
121 leader_schedule: Arc<LeaderSchedule>,
122 transaction_consumer: TransactionConsumer,
123 transaction_certifier: TransactionCertifier,
124 block_manager: BlockManager,
125 commit_observer: CommitObserver,
126 signals: CoreSignals,
127 block_signer: ProtocolKeyPair,
128 dag_state: Arc<RwLock<DagState>>,
129 sync_last_known_own_block: bool,
130 round_tracker: Arc<RwLock<RoundTracker>>,
131 ) -> Self {
132 let last_decided_leader = dag_state.read().last_commit_leader();
133 let number_of_leaders = context
134 .protocol_config
135 .mysticeti_num_leaders_per_round()
136 .unwrap_or(1);
137 let committer = UniversalCommitterBuilder::new(
138 context.clone(),
139 leader_schedule.clone(),
140 dag_state.clone(),
141 )
142 .with_number_of_leaders(number_of_leaders)
143 .with_pipeline(true)
144 .build();
145
146 let last_proposed_block = dag_state.read().get_last_proposed_block();
147
148 let last_signaled_round = last_proposed_block.round();
149
150 let mut last_included_ancestors = vec![None; context.committee.size()];
159 for ancestor in last_proposed_block.ancestors() {
160 last_included_ancestors[ancestor.author] = Some(*ancestor);
161 }
162
163 let min_propose_round = if sync_last_known_own_block {
164 None
165 } else {
166 Some(0)
168 };
169
170 let propagation_scores = leader_schedule
171 .leader_swap_table
172 .read()
173 .reputation_scores
174 .clone();
175 let mut ancestor_state_manager =
176 AncestorStateManager::new(context.clone(), dag_state.clone());
177 ancestor_state_manager.set_propagation_scores(propagation_scores);
178
179 Self {
180 context,
181 last_signaled_round,
182 last_included_ancestors,
183 last_decided_leader,
184 leader_schedule,
185 transaction_consumer,
186 transaction_certifier,
187 block_manager,
188 propagation_delay: 0,
189 committer,
190 commit_observer,
191 signals,
192 block_signer,
193 dag_state,
194 last_known_proposed_round: min_propose_round,
195 ancestor_state_manager,
196 round_tracker,
197 }
198 .recover()
199 }
200
201 fn recover(mut self) -> Self {
202 let _s = self
203 .context
204 .metrics
205 .node_metrics
206 .scope_processing_time
207 .with_label_values(&["Core::recover"])
208 .start_timer();
209
210 self.try_commit(vec![]).unwrap();
212
213 let last_proposed_block = if let Some(last_proposed_block) = self.try_propose(true).unwrap()
214 {
215 last_proposed_block
216 } else {
217 let last_proposed_block = self.dag_state.read().get_last_proposed_block();
218
219 if self.should_propose() {
220 assert!(
221 last_proposed_block.round() > GENESIS_ROUND,
222 "At minimum a block of round higher than genesis should have been produced during recovery"
223 );
224 }
225
226 self.signals
228 .new_block(ExtendedBlock {
229 block: last_proposed_block.clone(),
230 excluded_ancestors: vec![],
231 })
232 .unwrap();
233 last_proposed_block
234 };
235
236 self.try_signal_new_round();
240
241 info!(
242 "Core recovery completed with last proposed block {:?}",
243 last_proposed_block
244 );
245
246 self
247 }
248
249 #[tracing::instrument(skip_all)]
253 pub(crate) fn add_blocks(
254 &mut self,
255 blocks: Vec<VerifiedBlock>,
256 ) -> ConsensusResult<BTreeSet<BlockRef>> {
257 let _scope = monitored_scope("Core::add_blocks");
258 let _s = self
259 .context
260 .metrics
261 .node_metrics
262 .scope_processing_time
263 .with_label_values(&["Core::add_blocks"])
264 .start_timer();
265 self.context
266 .metrics
267 .node_metrics
268 .core_add_blocks_batch_size
269 .observe(blocks.len() as f64);
270
271 let (accepted_blocks, missing_block_refs) = self.block_manager.try_accept_blocks(blocks);
272
273 if !accepted_blocks.is_empty() {
274 trace!(
275 "Accepted blocks: {}",
276 accepted_blocks
277 .iter()
278 .map(|b| b.reference().to_string())
279 .join(",")
280 );
281
282 self.try_commit(vec![])?;
284
285 self.try_propose(false)?;
287
288 self.try_signal_new_round();
292 };
293
294 if !missing_block_refs.is_empty() {
295 trace!(
296 "Missing block refs: {}",
297 missing_block_refs.iter().map(|b| b.to_string()).join(", ")
298 );
299 }
300
301 Ok(missing_block_refs)
302 }
303
304 #[tracing::instrument(skip_all)]
309 pub(crate) fn add_certified_commits(
310 &mut self,
311 certified_commits: CertifiedCommits,
312 ) -> ConsensusResult<BTreeSet<BlockRef>> {
313 let _scope = monitored_scope("Core::add_certified_commits");
314
315 let votes = certified_commits.votes().to_vec();
316 let commits = self
317 .filter_new_commits(certified_commits.commits().to_vec())
318 .expect("Certified commits validation failed");
319
320 let (_, missing_block_refs) = self.block_manager.try_accept_blocks(votes);
324
325 self.try_commit(commits)?;
327
328 self.try_propose(false)?;
330
331 self.try_signal_new_round();
335
336 Ok(missing_block_refs)
337 }
338
339 pub(crate) fn check_block_refs(
342 &mut self,
343 block_refs: Vec<BlockRef>,
344 ) -> ConsensusResult<BTreeSet<BlockRef>> {
345 let _scope = monitored_scope("Core::check_block_refs");
346 let _s = self
347 .context
348 .metrics
349 .node_metrics
350 .scope_processing_time
351 .with_label_values(&["Core::check_block_refs"])
352 .start_timer();
353 self.context
354 .metrics
355 .node_metrics
356 .core_check_block_refs_batch_size
357 .observe(block_refs.len() as f64);
358
359 let missing_block_refs = self.block_manager.try_find_blocks(block_refs);
361
362 if !missing_block_refs.is_empty() {
363 trace!(
364 "Missing block refs: {}",
365 missing_block_refs.iter().map(|b| b.to_string()).join(", ")
366 );
367 }
368 Ok(missing_block_refs)
369 }
370
371 fn try_signal_new_round(&mut self) {
373 let new_clock_round = self.dag_state.read().threshold_clock_round();
378 if new_clock_round <= self.last_signaled_round {
379 return;
380 }
381 self.signals.new_round(new_clock_round);
383 self.last_signaled_round = new_clock_round;
384
385 self.context
387 .metrics
388 .node_metrics
389 .threshold_clock_round
390 .set(new_clock_round as i64);
391 }
392
393 pub(crate) fn new_block(
397 &mut self,
398 round: Round,
399 force: bool,
400 ) -> ConsensusResult<Option<VerifiedBlock>> {
401 let _scope = monitored_scope("Core::new_block");
402 if self.last_proposed_round() < round {
403 self.context
404 .metrics
405 .node_metrics
406 .leader_timeout_total
407 .with_label_values(&[&format!("{force}")])
408 .inc();
409 let result = self.try_propose(force);
410 self.try_signal_new_round();
412 return result;
413 }
414 Ok(None)
415 }
416
417 fn filter_new_commits(
420 &mut self,
421 commits: Vec<CertifiedCommit>,
422 ) -> ConsensusResult<Vec<CertifiedCommit>> {
423 let last_commit_index = self.dag_state.read().last_commit_index();
425 let commits = commits
426 .iter()
427 .filter(|commit| {
428 if commit.index() > last_commit_index {
429 true
430 } else {
431 tracing::debug!(
432 "Skip commit for index {} as it is already committed with last commit index {}",
433 commit.index(),
434 last_commit_index
435 );
436 false
437 }
438 })
439 .cloned()
440 .collect::<Vec<_>>();
441
442 if let Some(commit) = commits.first()
444 && commit.index() != last_commit_index + 1
445 {
446 return Err(ConsensusError::UnexpectedCertifiedCommitIndex {
447 expected_commit_index: last_commit_index + 1,
448 commit_index: commit.index(),
449 });
450 }
451
452 Ok(commits)
453 }
454
455 fn try_propose(&mut self, force: bool) -> ConsensusResult<Option<VerifiedBlock>> {
459 if !self.should_propose() {
460 return Ok(None);
461 }
462 if let Some(extended_block) = self.try_new_block(force) {
463 self.signals.new_block(extended_block.clone())?;
464
465 fail_point!("consensus-after-propose");
466
467 self.try_commit(vec![])?;
469 return Ok(Some(extended_block.block));
470 }
471 Ok(None)
472 }
473
474 fn try_new_block(&mut self, force: bool) -> Option<ExtendedBlock> {
477 let _s = self
478 .context
479 .metrics
480 .node_metrics
481 .scope_processing_time
482 .with_label_values(&["Core::try_new_block"])
483 .start_timer();
484
485 let clock_round = {
487 let dag_state = self.dag_state.read();
488 let clock_round = dag_state.threshold_clock_round();
489 if clock_round <= dag_state.get_last_proposed_block().round() {
490 debug!(
491 "Skipping block proposal for round {} as it is not higher than the last proposed block {}",
492 clock_round,
493 dag_state.get_last_proposed_block().round()
494 );
495 return None;
496 }
497 clock_round
498 };
499
500 let quorum_round = clock_round.saturating_sub(1);
502
503 if !force {
506 if !self.leaders_exist(quorum_round) {
507 return None;
508 }
509
510 if Duration::from_millis(
511 self.context
512 .clock
513 .timestamp_utc_ms()
514 .saturating_sub(self.last_proposed_timestamp_ms()),
515 ) < self.context.parameters.min_round_delay
516 {
517 debug!(
518 "Skipping block proposal for round {} as it is too soon after the last proposed block timestamp {}; min round delay is {}ms",
519 clock_round,
520 self.last_proposed_timestamp_ms(),
521 self.context.parameters.min_round_delay.as_millis(),
522 );
523 return None;
524 }
525 }
526
527 let (ancestors, excluded_and_equivocating_ancestors) =
529 self.smart_ancestors_to_propose(clock_round, !force);
530
531 if ancestors.is_empty() {
533 assert!(
534 !force,
535 "Ancestors should have been returned if force is true!"
536 );
537 debug!(
538 "Skipping block proposal for round {} because no good ancestor is found",
539 clock_round,
540 );
541 return None;
542 }
543
544 let excluded_ancestors_limit = self.context.committee.size() * 2;
545 if excluded_and_equivocating_ancestors.len() > excluded_ancestors_limit {
546 debug!(
547 "Dropping {} excluded ancestor(s) during proposal due to size limit",
548 excluded_and_equivocating_ancestors.len() - excluded_ancestors_limit,
549 );
550 }
551 let excluded_ancestors = excluded_and_equivocating_ancestors
552 .into_iter()
553 .take(excluded_ancestors_limit)
554 .collect();
555
556 for ancestor in &ancestors {
558 self.last_included_ancestors[ancestor.author()] = Some(ancestor.reference());
559 }
560
561 let leader_authority = &self
562 .context
563 .committee
564 .authority(self.first_leader(quorum_round))
565 .hostname;
566 self.context
567 .metrics
568 .node_metrics
569 .block_proposal_leader_wait_ms
570 .with_label_values(&[leader_authority])
571 .inc_by(
572 Instant::now()
573 .saturating_duration_since(self.dag_state.read().threshold_clock_quorum_ts())
574 .as_millis() as u64,
575 );
576 self.context
577 .metrics
578 .node_metrics
579 .block_proposal_leader_wait_count
580 .with_label_values(&[leader_authority])
581 .inc();
582
583 self.context
584 .metrics
585 .node_metrics
586 .proposed_block_ancestors
587 .observe(ancestors.len() as f64);
588 for ancestor in &ancestors {
589 let authority = &self.context.committee.authority(ancestor.author()).hostname;
590 self.context
591 .metrics
592 .node_metrics
593 .proposed_block_ancestors_depth
594 .with_label_values(&[authority])
595 .observe(clock_round.saturating_sub(ancestor.round()).into());
596 }
597
598 let now = self.context.clock.timestamp_utc_ms();
599 ancestors.iter().for_each(|block| {
600 if block.timestamp_ms() > now {
601 trace!("Ancestor block {:?} has timestamp {}, greater than current timestamp {now}. Proposing for round {}.", block, block.timestamp_ms(), clock_round);
602 let authority = &self.context.committee.authority(block.author()).hostname;
603 self.context
604 .metrics
605 .node_metrics
606 .proposed_block_ancestors_timestamp_drift_ms
607 .with_label_values(&[authority])
608 .inc_by(block.timestamp_ms().saturating_sub(now));
609 }
610 });
611
612 let (transactions, ack_transactions, _limit_reached) = self.transaction_consumer.next();
615 self.context
616 .metrics
617 .node_metrics
618 .proposed_block_transactions
619 .observe(transactions.len() as f64);
620
621 let commit_votes = self
623 .dag_state
624 .write()
625 .take_commit_votes(MAX_COMMIT_VOTES_PER_BLOCK);
626
627 let transaction_votes = if self.context.protocol_config.mysticeti_fastpath() {
628 let new_causal_history = {
629 let mut dag_state = self.dag_state.write();
630 ancestors
631 .iter()
632 .flat_map(|ancestor| dag_state.link_causal_history(ancestor.reference()))
633 .collect()
634 };
635 self.transaction_certifier.get_own_votes(new_causal_history)
636 } else {
637 vec![]
638 };
639
640 let block = if self.context.protocol_config.mysticeti_fastpath() {
642 Block::V2(BlockV2::new(
643 self.context.committee.epoch(),
644 clock_round,
645 self.context.own_index,
646 now,
647 ancestors.iter().map(|b| b.reference()).collect(),
648 transactions,
649 commit_votes,
650 transaction_votes,
651 vec![],
652 ))
653 } else {
654 Block::V1(BlockV1::new(
655 self.context.committee.epoch(),
656 clock_round,
657 self.context.own_index,
658 now,
659 ancestors.iter().map(|b| b.reference()).collect(),
660 transactions,
661 commit_votes,
662 vec![],
663 ))
664 };
665 let signed_block =
666 SignedBlock::new(block, &self.block_signer).expect("Block signing failed.");
667 let serialized = signed_block
668 .serialize()
669 .expect("Block serialization failed.");
670 self.context
671 .metrics
672 .node_metrics
673 .proposed_block_size
674 .observe(serialized.len() as f64);
675 let verified_block = VerifiedBlock::new_verified(signed_block, serialized);
677
678 let last_proposed_block = self.last_proposed_block();
680 if last_proposed_block.round() > 0 {
681 self.context
682 .metrics
683 .node_metrics
684 .block_proposal_interval
685 .observe(
686 Duration::from_millis(
687 verified_block
688 .timestamp_ms()
689 .saturating_sub(last_proposed_block.timestamp_ms()),
690 )
691 .as_secs_f64(),
692 );
693 }
694
695 let (accepted_blocks, missing) = self
697 .block_manager
698 .try_accept_blocks(vec![verified_block.clone()]);
699 assert_eq!(accepted_blocks.len(), 1);
700 assert!(missing.is_empty());
701
702 if self.context.protocol_config.mysticeti_fastpath() {
706 self.transaction_certifier
707 .add_voted_blocks(vec![(verified_block.clone(), vec![])]);
708 self.dag_state
709 .write()
710 .link_causal_history(verified_block.reference());
711 }
712
713 self.dag_state.write().flush();
715
716 ack_transactions(verified_block.reference());
718
719 info!("Created block {verified_block:?} for round {clock_round}");
720
721 self.context
722 .metrics
723 .node_metrics
724 .proposed_blocks
725 .with_label_values(&[&force.to_string()])
726 .inc();
727
728 let extended_block = ExtendedBlock {
729 block: verified_block,
730 excluded_ancestors,
731 };
732
733 self.round_tracker
735 .write()
736 .update_from_verified_block(&extended_block);
737
738 Some(extended_block)
739 }
740
741 fn try_commit(
744 &mut self,
745 mut certified_commits: Vec<CertifiedCommit>,
746 ) -> ConsensusResult<Vec<CommittedSubDag>> {
747 let _s = self
748 .context
749 .metrics
750 .node_metrics
751 .scope_processing_time
752 .with_label_values(&["Core::try_commit"])
753 .start_timer();
754
755 let mut certified_commits_map = BTreeMap::new();
756 for c in &certified_commits {
757 certified_commits_map.insert(c.index(), c.reference());
758 }
759
760 if !certified_commits.is_empty() {
761 info!(
762 "Processing synced commits: {:?}",
763 certified_commits
764 .iter()
765 .map(|c| (c.index(), c.leader()))
766 .collect::<Vec<_>>()
767 );
768 }
769
770 let mut committed_sub_dags = Vec::new();
771 loop {
773 let mut commits_until_update = self
778 .leader_schedule
779 .commits_until_leader_schedule_update(self.dag_state.clone());
780
781 if commits_until_update == 0 {
782 let last_commit_index = self.dag_state.read().last_commit_index();
783
784 tracing::info!(
785 "Leader schedule change triggered at commit index {last_commit_index}"
786 );
787
788 self.leader_schedule
789 .update_leader_schedule_v2(&self.dag_state);
790
791 let propagation_scores = self
792 .leader_schedule
793 .leader_swap_table
794 .read()
795 .reputation_scores
796 .clone();
797 self.ancestor_state_manager
798 .set_propagation_scores(propagation_scores);
799
800 commits_until_update = self
801 .leader_schedule
802 .commits_until_leader_schedule_update(self.dag_state.clone());
803
804 fail_point!("consensus-after-leader-schedule-change");
805 }
806 assert!(commits_until_update > 0);
807
808 let (certified_leaders, decided_certified_commits): (
811 Vec<DecidedLeader>,
812 Vec<CertifiedCommit>,
813 ) = self
814 .try_select_certified_leaders(&mut certified_commits, commits_until_update)
815 .into_iter()
816 .unzip();
817
818 let blocks = decided_certified_commits
825 .iter()
826 .flat_map(|c| c.blocks())
827 .cloned()
828 .collect::<Vec<_>>();
829 self.block_manager.try_accept_committed_blocks(blocks);
830
831 let (decided_leaders, local) = if certified_leaders.is_empty() {
833 let mut decided_leaders = self.committer.try_decide(self.last_decided_leader);
835 if decided_leaders.len() >= commits_until_update {
837 let _ = decided_leaders.split_off(commits_until_update);
838 }
839 (decided_leaders, true)
840 } else {
841 (certified_leaders, false)
842 };
843
844 let Some(last_decided) = decided_leaders.last().cloned() else {
846 break;
847 };
848
849 self.last_decided_leader = last_decided.slot();
850 self.context
851 .metrics
852 .node_metrics
853 .last_decided_leader_round
854 .set(self.last_decided_leader.round as i64);
855
856 let sequenced_leaders = decided_leaders
857 .into_iter()
858 .filter_map(|leader| leader.into_committed_block())
859 .collect::<Vec<_>>();
860 if sequenced_leaders.is_empty() {
863 break;
864 }
865 tracing::info!(
866 "Committing {} leaders: {}; {} commits before next leader schedule change",
867 sequenced_leaders.len(),
868 sequenced_leaders
869 .iter()
870 .map(|b| b.reference().to_string())
871 .join(","),
872 commits_until_update,
873 );
874
875 let subdags = self
877 .commit_observer
878 .handle_commit(sequenced_leaders, local)?;
879
880 self.block_manager
882 .try_unsuspend_blocks_for_latest_gc_round();
883
884 committed_sub_dags.extend(subdags);
885
886 fail_point!("consensus-after-handle-commit");
887 }
888
889 for sub_dag in &committed_sub_dags {
891 if let Some(commit_ref) = certified_commits_map.remove(&sub_dag.commit_ref.index) {
892 assert_eq!(
893 commit_ref, sub_dag.commit_ref,
894 "Certified commit has different reference than the committed sub dag"
895 );
896 }
897 }
898
899 let committed_block_refs = committed_sub_dags
901 .iter()
902 .flat_map(|sub_dag| sub_dag.blocks.iter())
903 .filter_map(|block| {
904 (block.author() == self.context.own_index).then_some(block.reference())
905 })
906 .collect::<Vec<_>>();
907 self.transaction_consumer
908 .notify_own_blocks_status(committed_block_refs, self.dag_state.read().gc_round());
909
910 Ok(committed_sub_dags)
911 }
912
913 pub(crate) fn get_missing_blocks(&self) -> BTreeSet<BlockRef> {
914 let _scope = monitored_scope("Core::get_missing_blocks");
915 self.block_manager.missing_blocks()
916 }
917
918 pub(crate) fn set_propagation_delay(&mut self, delay: Round) {
920 info!("Propagation round delay set to: {delay}");
921 self.propagation_delay = delay;
922 }
923
924 pub(crate) fn set_last_known_proposed_round(&mut self, round: Round) {
928 if self.last_known_proposed_round.is_some() {
929 panic!(
930 "Should not attempt to set the last known proposed round if that has been already set"
931 );
932 }
933 self.last_known_proposed_round = Some(round);
934 info!("Last known proposed round set to {round}");
935 }
936
937 pub(crate) fn should_propose(&self) -> bool {
939 let clock_round = self.dag_state.read().threshold_clock_round();
940 let core_skipped_proposals = &self.context.metrics.node_metrics.core_skipped_proposals;
941
942 if self.propagation_delay
943 > self
944 .context
945 .parameters
946 .propagation_delay_stop_proposal_threshold
947 {
948 debug!(
949 "Skip proposing for round {clock_round}, high propagation delay {} > {}.",
950 self.propagation_delay,
951 self.context
952 .parameters
953 .propagation_delay_stop_proposal_threshold
954 );
955 core_skipped_proposals
956 .with_label_values(&["high_propagation_delay"])
957 .inc();
958 return false;
959 }
960
961 let Some(last_known_proposed_round) = self.last_known_proposed_round else {
962 debug!(
963 "Skip proposing for round {clock_round}, last known proposed round has not been synced yet."
964 );
965 core_skipped_proposals
966 .with_label_values(&["no_last_known_proposed_round"])
967 .inc();
968 return false;
969 };
970 if clock_round <= last_known_proposed_round {
971 debug!(
972 "Skip proposing for round {clock_round} as last known proposed round is {last_known_proposed_round}"
973 );
974 core_skipped_proposals
975 .with_label_values(&["higher_last_known_proposed_round"])
976 .inc();
977 return false;
978 }
979
980 true
981 }
982
983 #[tracing::instrument(skip_all)]
989 fn try_select_certified_leaders(
990 &mut self,
991 certified_commits: &mut Vec<CertifiedCommit>,
992 limit: usize,
993 ) -> Vec<(DecidedLeader, CertifiedCommit)> {
994 assert!(limit > 0, "limit should be greater than 0");
995 if certified_commits.is_empty() {
996 return vec![];
997 }
998
999 let to_commit = if certified_commits.len() >= limit {
1000 certified_commits.drain(..limit).collect::<Vec<_>>()
1002 } else {
1003 std::mem::take(certified_commits)
1005 };
1006
1007 tracing::debug!(
1008 "Selected {} certified leaders: {}",
1009 to_commit.len(),
1010 to_commit.iter().map(|c| c.leader().to_string()).join(",")
1011 );
1012
1013 to_commit
1014 .into_iter()
1015 .map(|commit| {
1016 let leader = commit.blocks().last().expect("Certified commit should have at least one block");
1017 assert_eq!(leader.reference(), commit.leader(), "Last block of the committed sub dag should have the same digest as the leader of the commit");
1018 let leader = DecidedLeader::Commit(leader.clone(), false);
1020 UniversalCommitter::update_metrics(&self.context, &leader, Decision::Certified);
1021 (leader, commit)
1022 })
1023 .collect::<Vec<_>>()
1024 }
1025
1026 fn smart_ancestors_to_propose(
1030 &mut self,
1031 clock_round: Round,
1032 smart_select: bool,
1033 ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
1034 let node_metrics = &self.context.metrics.node_metrics;
1035 let _s = node_metrics
1036 .scope_processing_time
1037 .with_label_values(&["Core::smart_ancestors_to_propose"])
1038 .start_timer();
1039
1040 let all_ancestors = self
1042 .dag_state
1043 .read()
1044 .get_last_cached_block_per_authority(clock_round);
1045
1046 assert_eq!(
1047 all_ancestors.len(),
1048 self.context.committee.size(),
1049 "Fatal error, number of returned ancestors don't match committee size."
1050 );
1051
1052 let accepted_quorum_rounds = self.round_tracker.read().compute_accepted_quorum_rounds();
1054
1055 self.ancestor_state_manager
1056 .update_all_ancestors_state(&accepted_quorum_rounds);
1057
1058 let ancestor_state_map = self.ancestor_state_manager.get_ancestor_states();
1059
1060 let quorum_round = clock_round.saturating_sub(1);
1061
1062 let mut score_and_pending_excluded_ancestors = Vec::new();
1063 let mut excluded_and_equivocating_ancestors = BTreeSet::new();
1064
1065 let included_ancestors = iter::once(self.last_proposed_block().clone())
1070 .chain(
1071 all_ancestors
1072 .into_iter()
1073 .flat_map(|(ancestor, equivocating_ancestors)| {
1074 if ancestor.author() == self.context.own_index {
1075 return None;
1076 }
1077 if let Some(last_block_ref) =
1078 self.last_included_ancestors[ancestor.author()]
1079 && last_block_ref.round >= ancestor.round() {
1080 return None;
1081 }
1082
1083 excluded_and_equivocating_ancestors.extend(equivocating_ancestors);
1085
1086 let ancestor_state = ancestor_state_map[ancestor.author()];
1087 match ancestor_state {
1088 AncestorState::Include => {
1089 trace!("Found ancestor {ancestor} with INCLUDE state for round {clock_round}");
1090 }
1091 AncestorState::Exclude(score) => {
1092 trace!("Added ancestor {ancestor} with EXCLUDE state with score {score} to temporary excluded ancestors for round {clock_round}");
1093 score_and_pending_excluded_ancestors.push((score, ancestor));
1094 return None;
1095 }
1096 }
1097
1098 Some(ancestor)
1099 }),
1100 )
1101 .collect::<Vec<_>>();
1102
1103 let mut parent_round_quorum = StakeAggregator::<QuorumThreshold>::new();
1104
1105 for ancestor in included_ancestors
1107 .iter()
1108 .filter(|a| a.round() == quorum_round)
1109 {
1110 parent_round_quorum.add(ancestor.author(), &self.context.committee);
1111 }
1112
1113 if smart_select && !parent_round_quorum.reached_threshold(&self.context.committee) {
1114 node_metrics.smart_selection_wait.inc();
1115 debug!(
1116 "Only found {} stake of good ancestors to include for round {clock_round}, will wait for more.",
1117 parent_round_quorum.stake()
1118 );
1119 return (vec![], BTreeSet::new());
1120 }
1121
1122 score_and_pending_excluded_ancestors.sort_by(|a, b| b.0.cmp(&a.0));
1125
1126 let mut ancestors_to_propose = included_ancestors;
1127 let mut excluded_ancestors = Vec::new();
1128 for (score, ancestor) in score_and_pending_excluded_ancestors.into_iter() {
1129 let block_hostname = &self.context.committee.authority(ancestor.author()).hostname;
1130 if !parent_round_quorum.reached_threshold(&self.context.committee)
1131 && ancestor.round() == quorum_round
1132 {
1133 debug!(
1134 "Including temporarily excluded parent round ancestor {ancestor} with score {score} to propose for round {clock_round}"
1135 );
1136 parent_round_quorum.add(ancestor.author(), &self.context.committee);
1137 ancestors_to_propose.push(ancestor);
1138 node_metrics
1139 .included_excluded_proposal_ancestors_count_by_authority
1140 .with_label_values(&[block_hostname, "timeout"])
1141 .inc();
1142 } else {
1143 excluded_ancestors.push((score, ancestor));
1144 }
1145 }
1146
1147 for (score, ancestor) in excluded_ancestors.iter() {
1152 let excluded_author = ancestor.author();
1153 let block_hostname = &self.context.committee.authority(excluded_author).hostname;
1154 let mut accepted_low_quorum_round = accepted_quorum_rounds[excluded_author].0;
1156 accepted_low_quorum_round = accepted_low_quorum_round.min(quorum_round);
1160
1161 let last_included_round = self.last_included_ancestors[excluded_author]
1162 .map(|block_ref| block_ref.round)
1163 .unwrap_or(GENESIS_ROUND);
1164 if ancestor.round() <= last_included_round {
1165 continue;
1168 }
1169
1170 if last_included_round >= accepted_low_quorum_round {
1171 excluded_and_equivocating_ancestors.insert(ancestor.reference());
1172 trace!(
1173 "Excluded low score ancestor {} with score {score} to propose for round {clock_round}: last included round {last_included_round} >= accepted low quorum round {accepted_low_quorum_round}",
1174 ancestor.reference()
1175 );
1176 node_metrics
1177 .excluded_proposal_ancestors_count_by_authority
1178 .with_label_values(&[block_hostname])
1179 .inc();
1180 continue;
1181 }
1182
1183 let ancestor = if ancestor.round() <= accepted_low_quorum_round {
1184 ancestor.clone()
1186 } else {
1187 excluded_and_equivocating_ancestors.insert(ancestor.reference());
1189 trace!(
1190 "Excluded low score ancestor {} with score {score} to propose for round {clock_round}: ancestor round {} > accepted low quorum round {accepted_low_quorum_round} ",
1191 ancestor.reference(),
1192 ancestor.round()
1193 );
1194 node_metrics
1195 .excluded_proposal_ancestors_count_by_authority
1196 .with_label_values(&[block_hostname])
1197 .inc();
1198
1199 match self.dag_state.read().get_last_cached_block_in_range(
1205 excluded_author,
1206 last_included_round + 1,
1207 accepted_low_quorum_round + 1,
1208 ) {
1209 Some(earlier_ancestor) => {
1210 earlier_ancestor
1212 }
1213 None => {
1214 continue;
1216 }
1217 }
1218 };
1219 self.last_included_ancestors[excluded_author] = Some(ancestor.reference());
1220 ancestors_to_propose.push(ancestor.clone());
1221 trace!(
1222 "Included low scoring ancestor {} with score {score} seen at accepted low quorum round {accepted_low_quorum_round} to propose for round {clock_round}",
1223 ancestor.reference()
1224 );
1225 node_metrics
1226 .included_excluded_proposal_ancestors_count_by_authority
1227 .with_label_values(&[block_hostname, "quorum"])
1228 .inc();
1229 }
1230
1231 assert!(
1232 parent_round_quorum.reached_threshold(&self.context.committee),
1233 "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core."
1234 );
1235
1236 debug!(
1237 "Included {} ancestors & excluded {} low performing or equivocating ancestors for proposal in round {clock_round}",
1238 ancestors_to_propose.len(),
1239 excluded_and_equivocating_ancestors.len()
1240 );
1241
1242 (ancestors_to_propose, excluded_and_equivocating_ancestors)
1243 }
1244
1245 fn leaders_exist(&self, round: Round) -> bool {
1249 let dag_state = self.dag_state.read();
1250 for leader in self.leaders(round) {
1251 if !dag_state.contains_cached_block_at_slot(leader) {
1255 return false;
1256 }
1257 }
1258
1259 true
1260 }
1261
1262 fn leaders(&self, round: Round) -> Vec<Slot> {
1264 self.committer
1265 .get_leaders(round)
1266 .into_iter()
1267 .map(|authority_index| Slot::new(round, authority_index))
1268 .collect()
1269 }
1270
1271 fn first_leader(&self, round: Round) -> AuthorityIndex {
1273 self.leaders(round).first().unwrap().authority
1274 }
1275
1276 fn last_proposed_timestamp_ms(&self) -> BlockTimestampMs {
1277 self.last_proposed_block().timestamp_ms()
1278 }
1279
1280 fn last_proposed_round(&self) -> Round {
1281 self.last_proposed_block().round()
1282 }
1283
1284 fn last_proposed_block(&self) -> VerifiedBlock {
1285 self.dag_state.read().get_last_proposed_block()
1286 }
1287}
1288
1289pub(crate) struct CoreSignals {
1291 tx_block_broadcast: broadcast::Sender<ExtendedBlock>,
1292 new_round_sender: watch::Sender<Round>,
1293 context: Arc<Context>,
1294}
1295
1296impl CoreSignals {
1297 pub fn new(context: Arc<Context>) -> (Self, CoreSignalsReceivers) {
1298 let (tx_block_broadcast, rx_block_broadcast) = broadcast::channel::<ExtendedBlock>(
1302 context.parameters.dag_state_cached_rounds as usize,
1303 );
1304 let (new_round_sender, new_round_receiver) = watch::channel(0);
1305
1306 let me = Self {
1307 tx_block_broadcast,
1308 new_round_sender,
1309 context,
1310 };
1311
1312 let receivers = CoreSignalsReceivers {
1313 rx_block_broadcast,
1314 new_round_receiver,
1315 };
1316
1317 (me, receivers)
1318 }
1319
1320 pub(crate) fn new_block(&self, extended_block: ExtendedBlock) -> ConsensusResult<()> {
1323 if self.context.committee.size() > 1 {
1326 if extended_block.block.round() == GENESIS_ROUND {
1327 debug!("Ignoring broadcasting genesis block to peers");
1328 return Ok(());
1329 }
1330
1331 if let Err(err) = self.tx_block_broadcast.send(extended_block) {
1332 warn!("Couldn't broadcast the block to any receiver: {err}");
1333 return Err(ConsensusError::Shutdown);
1334 }
1335 } else {
1336 debug!(
1337 "Did not broadcast block {extended_block:?} to receivers as committee size is <= 1"
1338 );
1339 }
1340 Ok(())
1341 }
1342
1343 pub(crate) fn new_round(&mut self, round_number: Round) {
1346 let _ = self.new_round_sender.send_replace(round_number);
1347 }
1348}
1349
1350pub(crate) struct CoreSignalsReceivers {
1353 rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
1354 new_round_receiver: watch::Receiver<Round>,
1355}
1356
1357impl CoreSignalsReceivers {
1358 pub(crate) fn block_broadcast_receiver(&self) -> broadcast::Receiver<ExtendedBlock> {
1359 self.rx_block_broadcast.resubscribe()
1360 }
1361
1362 pub(crate) fn new_round_receiver(&self) -> watch::Receiver<Round> {
1363 self.new_round_receiver.clone()
1364 }
1365}
1366
1367#[cfg(test)]
1370pub(crate) async fn create_cores(
1371 context: Context,
1372 authorities: Vec<Stake>,
1373) -> Vec<CoreTextFixture> {
1374 let mut cores = Vec::new();
1375
1376 for index in 0..authorities.len() {
1377 let own_index = AuthorityIndex::new_for_test(index as u32);
1378 let core =
1379 CoreTextFixture::new(context.clone(), authorities.clone(), own_index, false).await;
1380 cores.push(core);
1381 }
1382 cores
1383}
1384
1385#[cfg(test)]
1386pub(crate) struct CoreTextFixture {
1387 pub(crate) core: Core,
1388 pub(crate) transaction_certifier: TransactionCertifier,
1389 pub(crate) signal_receivers: CoreSignalsReceivers,
1390 pub(crate) block_receiver: broadcast::Receiver<ExtendedBlock>,
1391 pub(crate) _commit_output_receiver: UnboundedReceiver<CommittedSubDag>,
1392 pub(crate) dag_state: Arc<RwLock<DagState>>,
1393 pub(crate) store: Arc<MemStore>,
1394}
1395
1396#[cfg(test)]
1397impl CoreTextFixture {
1398 async fn new(
1399 context: Context,
1400 authorities: Vec<Stake>,
1401 own_index: AuthorityIndex,
1402 sync_last_known_own_block: bool,
1403 ) -> Self {
1404 let (committee, mut signers) = local_committee_and_keys(0, authorities.clone());
1405 let mut context = context.clone();
1406 context = context
1407 .with_committee(committee)
1408 .with_authority_index(own_index);
1409 context
1410 .protocol_config
1411 .set_consensus_bad_nodes_stake_threshold_for_testing(33);
1412
1413 let context = Arc::new(context);
1414 let store = Arc::new(MemStore::new());
1415 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1416
1417 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
1418 let leader_schedule = Arc::new(
1419 LeaderSchedule::from_store(context.clone(), dag_state.clone())
1420 .with_num_commits_per_schedule(10),
1421 );
1422 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1423 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1424 let (blocks_sender, _blocks_receiver) =
1425 mysten_metrics::monitored_mpsc::unbounded_channel("consensus_block_output");
1426 let transaction_certifier = TransactionCertifier::new(
1427 context.clone(),
1428 Arc::new(NoopBlockVerifier {}),
1429 dag_state.clone(),
1430 blocks_sender,
1431 );
1432 let (signals, signal_receivers) = CoreSignals::new(context.clone());
1433 let block_receiver = signal_receivers.block_broadcast_receiver();
1435
1436 let (commit_consumer, commit_output_receiver) = CommitConsumerArgs::new(0, 0);
1437 let commit_observer = CommitObserver::new(
1438 context.clone(),
1439 commit_consumer,
1440 dag_state.clone(),
1441 transaction_certifier.clone(),
1442 leader_schedule.clone(),
1443 )
1444 .await;
1445
1446 let block_signer = signers.remove(own_index.value()).1;
1447
1448 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1449 let core = Core::new(
1450 context,
1451 leader_schedule,
1452 transaction_consumer,
1453 transaction_certifier.clone(),
1454 block_manager,
1455 commit_observer,
1456 signals,
1457 block_signer,
1458 dag_state.clone(),
1459 sync_last_known_own_block,
1460 round_tracker,
1461 );
1462
1463 Self {
1464 core,
1465 transaction_certifier,
1466 signal_receivers,
1467 block_receiver,
1468 _commit_output_receiver: commit_output_receiver,
1469 dag_state,
1470 store,
1471 }
1472 }
1473
1474 pub(crate) fn add_blocks(
1475 &mut self,
1476 blocks: Vec<VerifiedBlock>,
1477 ) -> ConsensusResult<BTreeSet<BlockRef>> {
1478 self.transaction_certifier
1479 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
1480 self.core.add_blocks(blocks)
1481 }
1482}
1483
1484#[cfg(test)]
1485mod test {
1486 use std::{collections::BTreeSet, time::Duration};
1487
1488 use consensus_config::{AuthorityIndex, Parameters};
1489 use consensus_types::block::TransactionIndex;
1490 use futures::{StreamExt, stream::FuturesUnordered};
1491 use mysten_metrics::monitored_mpsc;
1492 use sui_protocol_config::ProtocolConfig;
1493 use tokio::time::sleep;
1494
1495 use super::*;
1496 use crate::{
1497 CommitConsumerArgs, CommitIndex,
1498 block::{TestBlock, genesis_blocks},
1499 block_verifier::NoopBlockVerifier,
1500 commit::CommitAPI,
1501 leader_scoring::ReputationScores,
1502 storage::{Store, WriteBatch, mem_store::MemStore},
1503 test_dag_builder::DagBuilder,
1504 test_dag_parser::parse_dag,
1505 transaction::{BlockStatus, TransactionClient},
1506 };
1507
1508 #[tokio::test]
1510 async fn test_core_recover_from_store_for_full_round() {
1511 telemetry_subscribers::init_for_testing();
1512 let (context, mut key_pairs) = Context::new_for_test(4);
1513 let context = Arc::new(context);
1514 let store = Arc::new(MemStore::new());
1515 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1516 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1517 let mut block_status_subscriptions = FuturesUnordered::new();
1518
1519 let mut last_round_blocks = genesis_blocks(&context);
1521 let mut all_blocks: Vec<VerifiedBlock> = last_round_blocks.clone();
1522 for round in 1..=4 {
1523 let mut this_round_blocks = Vec::new();
1524 for (index, _authority) in context.committee.authorities() {
1525 let block = VerifiedBlock::new_for_test(
1526 TestBlock::new(round, index.value() as u32)
1527 .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
1528 .build(),
1529 );
1530
1531 if round == 1 && index == context.own_index {
1533 let subscription =
1534 transaction_consumer.subscribe_for_block_status_testing(block.reference());
1535 block_status_subscriptions.push(subscription);
1536 }
1537
1538 this_round_blocks.push(block);
1539 }
1540 all_blocks.extend(this_round_blocks.clone());
1541 last_round_blocks = this_round_blocks;
1542 }
1543 store
1545 .write(WriteBatch::default().blocks(all_blocks))
1546 .expect("Storage error");
1547
1548 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1550 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
1551 let leader_schedule = Arc::new(LeaderSchedule::from_store(
1552 context.clone(),
1553 dag_state.clone(),
1554 ));
1555 let (blocks_sender, _blocks_receiver) =
1556 monitored_mpsc::unbounded_channel("consensus_block_output");
1557 let transaction_certifier = TransactionCertifier::new(
1558 context.clone(),
1559 Arc::new(NoopBlockVerifier {}),
1560 dag_state.clone(),
1561 blocks_sender,
1562 );
1563
1564 let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
1565 let commit_observer = CommitObserver::new(
1566 context.clone(),
1567 commit_consumer,
1568 dag_state.clone(),
1569 transaction_certifier.clone(),
1570 leader_schedule.clone(),
1571 )
1572 .await;
1573
1574 let last_commit = store.read_last_commit().unwrap();
1576 assert!(last_commit.is_none());
1577 assert_eq!(dag_state.read().last_commit_index(), 0);
1578
1579 let (signals, signal_receivers) = CoreSignals::new(context.clone());
1581 let (blocks_sender, _blocks_receiver) =
1582 monitored_mpsc::unbounded_channel("consensus_block_output");
1583 let transaction_certifier = TransactionCertifier::new(
1584 context.clone(),
1585 Arc::new(NoopBlockVerifier {}),
1586 dag_state.clone(),
1587 blocks_sender,
1588 );
1589 transaction_certifier.recover_blocks_after_round(dag_state.read().gc_round());
1590 let mut block_receiver = signal_receivers.block_broadcast_receiver();
1592 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1593 let _core = Core::new(
1594 context.clone(),
1595 leader_schedule,
1596 transaction_consumer,
1597 transaction_certifier.clone(),
1598 block_manager,
1599 commit_observer,
1600 signals,
1601 key_pairs.remove(context.own_index.value()).1,
1602 dag_state.clone(),
1603 false,
1604 round_tracker,
1605 );
1606
1607 let mut new_round = signal_receivers.new_round_receiver();
1609 assert_eq!(*new_round.borrow_and_update(), 5);
1610
1611 let proposed_block = block_receiver
1613 .recv()
1614 .await
1615 .expect("A block should have been created");
1616 assert_eq!(proposed_block.block.round(), 5);
1617 let ancestors = proposed_block.block.ancestors();
1618
1619 assert_eq!(ancestors.len(), 4);
1621 for ancestor in ancestors {
1622 assert_eq!(ancestor.round, 4);
1623 }
1624
1625 dag_state.write().flush();
1627
1628 let last_commit = store
1632 .read_last_commit()
1633 .unwrap()
1634 .expect("last commit should be set");
1635 assert_eq!(last_commit.index(), 2);
1636 assert_eq!(dag_state.read().last_commit_index(), 2);
1637 let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1638 assert_eq!(all_stored_commits.len(), 2);
1639
1640 while let Some(result) = block_status_subscriptions.next().await {
1642 let status = result.unwrap();
1643 assert!(matches!(status, BlockStatus::Sequenced(_)));
1644 }
1645 }
1646
1647 #[tokio::test]
1650 async fn test_core_recover_from_store_for_partial_round() {
1651 telemetry_subscribers::init_for_testing();
1652
1653 let (context, mut key_pairs) = Context::new_for_test(4);
1654 let context = Arc::new(context);
1655 let store = Arc::new(MemStore::new());
1656 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1657 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1658
1659 let mut last_round_blocks = genesis_blocks(&context);
1661 let mut all_blocks = last_round_blocks.clone();
1662 for round in 1..=4 {
1663 let mut this_round_blocks = Vec::new();
1664
1665 let authorities_to_skip = if round == 4 {
1667 context.committee.validity_threshold() as usize
1668 } else {
1669 1
1671 };
1672
1673 for (index, _authority) in context.committee.authorities().skip(authorities_to_skip) {
1674 let block = TestBlock::new(round, index.value() as u32)
1675 .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
1676 .build();
1677 this_round_blocks.push(VerifiedBlock::new_for_test(block));
1678 }
1679 all_blocks.extend(this_round_blocks.clone());
1680 last_round_blocks = this_round_blocks;
1681 }
1682
1683 store
1685 .write(WriteBatch::default().blocks(all_blocks))
1686 .expect("Storage error");
1687
1688 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1690 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
1691 let leader_schedule = Arc::new(LeaderSchedule::from_store(
1692 context.clone(),
1693 dag_state.clone(),
1694 ));
1695 let (blocks_sender, _blocks_receiver) =
1696 monitored_mpsc::unbounded_channel("consensus_block_output");
1697 let transaction_certifier = TransactionCertifier::new(
1698 context.clone(),
1699 Arc::new(NoopBlockVerifier {}),
1700 dag_state.clone(),
1701 blocks_sender,
1702 );
1703
1704 let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
1705 let commit_observer = CommitObserver::new(
1706 context.clone(),
1707 commit_consumer,
1708 dag_state.clone(),
1709 transaction_certifier.clone(),
1710 leader_schedule.clone(),
1711 )
1712 .await;
1713
1714 let last_commit = store.read_last_commit().unwrap();
1716 assert!(last_commit.is_none());
1717 assert_eq!(dag_state.read().last_commit_index(), 0);
1718
1719 let (signals, signal_receivers) = CoreSignals::new(context.clone());
1721 let (blocks_sender, _blocks_receiver) =
1722 monitored_mpsc::unbounded_channel("consensus_block_output");
1723 let transaction_certifier = TransactionCertifier::new(
1724 context.clone(),
1725 Arc::new(NoopBlockVerifier {}),
1726 dag_state.clone(),
1727 blocks_sender,
1728 );
1729 transaction_certifier.recover_blocks_after_round(dag_state.read().gc_round());
1730 let mut block_receiver = signal_receivers.block_broadcast_receiver();
1732 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1733 let mut core = Core::new(
1734 context.clone(),
1735 leader_schedule,
1736 transaction_consumer,
1737 transaction_certifier,
1738 block_manager,
1739 commit_observer,
1740 signals,
1741 key_pairs.remove(context.own_index.value()).1,
1742 dag_state.clone(),
1743 false,
1744 round_tracker,
1745 );
1746
1747 let mut new_round = signal_receivers.new_round_receiver();
1750 assert_eq!(*new_round.borrow_and_update(), 5);
1751
1752 let proposed_block = block_receiver
1754 .recv()
1755 .await
1756 .expect("A block should have been created");
1757 assert_eq!(proposed_block.block.round(), 4);
1758 let ancestors = proposed_block.block.ancestors();
1759
1760 assert_eq!(ancestors.len(), 4);
1761 for ancestor in ancestors {
1762 if ancestor.author == context.own_index {
1763 assert_eq!(ancestor.round, 0);
1764 } else {
1765 assert_eq!(ancestor.round, 3);
1766 }
1767 }
1768
1769 core.try_commit(vec![]).ok();
1771
1772 core.dag_state.write().flush();
1774
1775 let last_commit = store
1779 .read_last_commit()
1780 .unwrap()
1781 .expect("last commit should be set");
1782 assert_eq!(last_commit.index(), 2);
1783 assert_eq!(dag_state.read().last_commit_index(), 2);
1784 let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1785 assert_eq!(all_stored_commits.len(), 2);
1786 }
1787
1788 #[tokio::test]
1789 async fn test_core_propose_after_genesis() {
1790 telemetry_subscribers::init_for_testing();
1791 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1792 config.set_consensus_max_transaction_size_bytes_for_testing(2_000);
1793 config.set_consensus_max_transactions_in_block_bytes_for_testing(2_000);
1794 config
1795 });
1796
1797 let (context, mut key_pairs) = Context::new_for_test(4);
1798 let context = Arc::new(context);
1799 let store = Arc::new(MemStore::new());
1800 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1801
1802 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
1803 let (transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1804 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1805 let (blocks_sender, _blocks_receiver) =
1806 monitored_mpsc::unbounded_channel("consensus_block_output");
1807 let transaction_certifier = TransactionCertifier::new(
1808 context.clone(),
1809 Arc::new(NoopBlockVerifier {}),
1810 dag_state.clone(),
1811 blocks_sender,
1812 );
1813 let (signals, signal_receivers) = CoreSignals::new(context.clone());
1814 let mut block_receiver = signal_receivers.block_broadcast_receiver();
1816 let leader_schedule = Arc::new(LeaderSchedule::from_store(
1817 context.clone(),
1818 dag_state.clone(),
1819 ));
1820
1821 let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
1822 let commit_observer = CommitObserver::new(
1823 context.clone(),
1824 commit_consumer,
1825 dag_state.clone(),
1826 transaction_certifier.clone(),
1827 leader_schedule.clone(),
1828 )
1829 .await;
1830
1831 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1832 let mut core = Core::new(
1833 context.clone(),
1834 leader_schedule,
1835 transaction_consumer,
1836 transaction_certifier,
1837 block_manager,
1838 commit_observer,
1839 signals,
1840 key_pairs.remove(context.own_index.value()).1,
1841 dag_state.clone(),
1842 false,
1843 round_tracker,
1844 );
1845
1846 let mut total = 0;
1848 let mut index = 0;
1849 loop {
1850 let transaction =
1851 bcs::to_bytes(&format!("Transaction {index}")).expect("Shouldn't fail");
1852 total += transaction.len();
1853 index += 1;
1854 let _w = transaction_client
1855 .submit_no_wait(vec![transaction])
1856 .await
1857 .unwrap();
1858
1859 if total >= 1_000 {
1861 break;
1862 }
1863 }
1864
1865 let extended_block = block_receiver
1867 .recv()
1868 .await
1869 .expect("A new block should have been created");
1870
1871 assert_eq!(extended_block.block.round(), 1);
1873 assert_eq!(extended_block.block.author().value(), 0);
1874 assert_eq!(extended_block.block.ancestors().len(), 4);
1875
1876 let mut total = 0;
1877 for (i, transaction) in extended_block.block.transactions().iter().enumerate() {
1878 total += transaction.data().len() as u64;
1879 let transaction: String = bcs::from_bytes(transaction.data()).unwrap();
1880 assert_eq!(format!("Transaction {i}"), transaction);
1881 }
1882 assert!(total <= context.protocol_config.max_transactions_in_block_bytes());
1883
1884 let all_genesis = genesis_blocks(&context);
1886
1887 for ancestor in extended_block.block.ancestors() {
1888 all_genesis
1889 .iter()
1890 .find(|block| block.reference() == *ancestor)
1891 .expect("Block should be found amongst genesis blocks");
1892 }
1893
1894 assert!(core.try_propose(false).unwrap().is_none());
1896 assert!(core.try_propose(true).unwrap().is_none());
1897
1898 dag_state.write().flush();
1900
1901 let last_commit = store.read_last_commit().unwrap();
1903 assert!(last_commit.is_none());
1904 assert_eq!(dag_state.read().last_commit_index(), 0);
1905 }
1906
1907 #[tokio::test]
1908 async fn test_core_propose_once_receiving_a_quorum() {
1909 telemetry_subscribers::init_for_testing();
1910 let (context, _key_pairs) = Context::new_for_test(4);
1911 let mut core_fixture = CoreTextFixture::new(
1912 context.clone(),
1913 vec![1, 1, 1, 1],
1914 AuthorityIndex::new_for_test(0),
1915 false,
1916 )
1917 .await;
1918 let transaction_certifier = &core_fixture.transaction_certifier;
1919 let store = &core_fixture.store;
1920 let dag_state = &core_fixture.dag_state;
1921 let core = &mut core_fixture.core;
1922
1923 let mut expected_ancestors = BTreeSet::new();
1924
1925 let block_1 = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
1927 expected_ancestors.insert(block_1.reference());
1928 sleep(context.parameters.min_round_delay).await;
1930 transaction_certifier.add_voted_blocks(vec![(block_1.clone(), vec![])]);
1932 _ = core.add_blocks(vec![block_1]);
1933
1934 assert_eq!(core.last_proposed_round(), 1);
1935 expected_ancestors.insert(core.last_proposed_block().reference());
1936 assert!(core.try_propose(false).unwrap().is_none());
1938
1939 let block_2 = VerifiedBlock::new_for_test(TestBlock::new(1, 2).build());
1941 expected_ancestors.insert(block_2.reference());
1942 sleep(context.parameters.min_round_delay).await;
1944 transaction_certifier.add_voted_blocks(vec![(block_2.clone(), vec![1, 4])]);
1946 _ = core.add_blocks(vec![block_2.clone()]);
1947
1948 assert_eq!(core.last_proposed_round(), 2);
1949
1950 let proposed_block = core.last_proposed_block();
1951 assert_eq!(proposed_block.round(), 2);
1952 assert_eq!(proposed_block.author(), context.own_index);
1953 assert_eq!(proposed_block.ancestors().len(), 3);
1954 let ancestors = proposed_block.ancestors();
1955 let ancestors = ancestors.iter().cloned().collect::<BTreeSet<_>>();
1956 assert_eq!(ancestors, expected_ancestors);
1957
1958 let transaction_votes = proposed_block.transaction_votes();
1959 assert_eq!(transaction_votes.len(), 1);
1960 let transaction_vote = transaction_votes.first().unwrap();
1961 assert_eq!(transaction_vote.block_ref, block_2.reference());
1962 assert_eq!(transaction_vote.rejects, vec![1, 4]);
1963
1964 dag_state.write().flush();
1966
1967 let last_commit = store.read_last_commit().unwrap();
1969 assert!(last_commit.is_none());
1970 assert_eq!(dag_state.read().last_commit_index(), 0);
1971 }
1972
1973 #[tokio::test]
1974 async fn test_commit_and_notify_for_block_status() {
1975 telemetry_subscribers::init_for_testing();
1976 let (mut context, mut key_pairs) = Context::new_for_test(4);
1977 const GC_DEPTH: u32 = 2;
1978
1979 context
1980 .protocol_config
1981 .set_consensus_gc_depth_for_testing(GC_DEPTH);
1982
1983 let context = Arc::new(context);
1984
1985 let store = Arc::new(MemStore::new());
1986 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1987 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1988 let mut block_status_subscriptions = FuturesUnordered::new();
1989
1990 let dag_str = "DAG {
1991 Round 0 : { 4 },
1992 Round 1 : { * },
1993 Round 2 : { * },
1994 Round 3 : {
1995 A -> [*],
1996 B -> [-A2],
1997 C -> [-A2],
1998 D -> [-A2],
1999 },
2000 Round 4 : {
2001 B -> [-A3],
2002 C -> [-A3],
2003 D -> [-A3],
2004 },
2005 Round 5 : {
2006 A -> [A3, B4, C4, D4]
2007 B -> [*],
2008 C -> [*],
2009 D -> [*],
2010 },
2011 Round 6 : { * },
2012 Round 7 : { * },
2013 Round 8 : { * },
2014 }";
2015
2016 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2017 dag_builder.print();
2018
2019 for block in dag_builder.blocks(1..=5) {
2021 if block.author() == context.own_index {
2022 let subscription =
2023 transaction_consumer.subscribe_for_block_status_testing(block.reference());
2024 block_status_subscriptions.push(subscription);
2025 }
2026 }
2027
2028 store
2030 .write(WriteBatch::default().blocks(dag_builder.blocks(1..=8)))
2031 .expect("Storage error");
2032
2033 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2035 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2036 let leader_schedule = Arc::new(LeaderSchedule::from_store(
2037 context.clone(),
2038 dag_state.clone(),
2039 ));
2040 let (blocks_sender, _blocks_receiver) =
2041 monitored_mpsc::unbounded_channel("consensus_block_output");
2042 let transaction_certifier = TransactionCertifier::new(
2043 context.clone(),
2044 Arc::new(NoopBlockVerifier {}),
2045 dag_state.clone(),
2046 blocks_sender,
2047 );
2048
2049 let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
2050 let commit_observer = CommitObserver::new(
2051 context.clone(),
2052 commit_consumer,
2053 dag_state.clone(),
2054 transaction_certifier.clone(),
2055 leader_schedule.clone(),
2056 )
2057 .await;
2058
2059 dag_state.write().flush();
2061
2062 let last_commit = store.read_last_commit().unwrap();
2064 assert!(last_commit.is_none());
2065 assert_eq!(dag_state.read().last_commit_index(), 0);
2066
2067 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2069 let (blocks_sender, _blocks_receiver) =
2070 monitored_mpsc::unbounded_channel("consensus_block_output");
2071 let transaction_certifier = TransactionCertifier::new(
2072 context.clone(),
2073 Arc::new(NoopBlockVerifier {}),
2074 dag_state.clone(),
2075 blocks_sender,
2076 );
2077 transaction_certifier.recover_blocks_after_round(dag_state.read().gc_round());
2078 let _block_receiver = signal_receivers.block_broadcast_receiver();
2080 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
2081 let _core = Core::new(
2082 context.clone(),
2083 leader_schedule,
2084 transaction_consumer,
2085 transaction_certifier,
2086 block_manager,
2087 commit_observer,
2088 signals,
2089 key_pairs.remove(context.own_index.value()).1,
2090 dag_state.clone(),
2091 false,
2092 round_tracker,
2093 );
2094
2095 dag_state.write().flush();
2097
2098 let last_commit = store
2099 .read_last_commit()
2100 .unwrap()
2101 .expect("last commit should be set");
2102
2103 assert_eq!(last_commit.index(), 5);
2104
2105 while let Some(result) = block_status_subscriptions.next().await {
2106 let status = result.unwrap();
2107
2108 match status {
2109 BlockStatus::Sequenced(block_ref) => {
2110 assert!(block_ref.round == 1 || block_ref.round == 5);
2111 }
2112 BlockStatus::GarbageCollected(block_ref) => {
2113 assert!(block_ref.round == 2 || block_ref.round == 3);
2114 }
2115 }
2116 }
2117 }
2118
2119 #[tokio::test]
2122 async fn test_multiple_commits_advance_threshold_clock() {
2123 telemetry_subscribers::init_for_testing();
2124 let (mut context, mut key_pairs) = Context::new_for_test(4);
2125 const GC_DEPTH: u32 = 2;
2126
2127 context
2128 .protocol_config
2129 .set_consensus_gc_depth_for_testing(GC_DEPTH);
2130
2131 let context = Arc::new(context);
2132
2133 let store = Arc::new(MemStore::new());
2134 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2135 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2136
2137 let dag_str = "DAG {
2141 Round 0 : { 4 },
2142 Round 1 : { * },
2143 Round 2 : {
2144 B -> [-D1],
2145 C -> [-D1],
2146 D -> [-D1],
2147 },
2148 Round 3 : {
2149 B -> [*],
2150 C -> [*]
2151 D -> [*],
2152 },
2153 Round 4 : {
2154 A -> [*],
2155 B -> [*],
2156 C -> [*]
2157 D -> [*],
2158 },
2159 Round 5 : {
2160 A -> [*],
2161 B -> [*],
2162 C -> [*],
2163 D -> [*],
2164 },
2165 Round 6 : {
2166 B -> [A5, B5, C5, D1],
2167 C -> [A5, B5, C5, D1],
2168 D -> [A5, B5, C5, D1],
2169 },
2170 Round 7 : {
2171 B -> [*],
2172 C -> [*],
2173 D -> [*],
2174 },
2175 Round 8 : {
2176 B -> [*],
2177 C -> [*],
2178 D -> [*],
2179 },
2180 Round 9 : {
2181 B -> [*],
2182 C -> [*],
2183 D -> [*],
2184 },
2185 Round 10 : {
2186 B -> [*],
2187 C -> [*],
2188 D -> [*],
2189 },
2190 Round 11 : {
2191 B -> [*],
2192 C -> [*],
2193 D -> [*],
2194 },
2195 }";
2196
2197 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2198 dag_builder.print();
2199
2200 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2202 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2203 let leader_schedule = Arc::new(LeaderSchedule::from_store(
2204 context.clone(),
2205 dag_state.clone(),
2206 ));
2207 let (blocks_sender, _blocks_receiver) =
2208 monitored_mpsc::unbounded_channel("consensus_block_output");
2209 let transaction_certifier = TransactionCertifier::new(
2210 context.clone(),
2211 Arc::new(NoopBlockVerifier {}),
2212 dag_state.clone(),
2213 blocks_sender,
2214 );
2215
2216 let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
2217 let commit_observer = CommitObserver::new(
2218 context.clone(),
2219 commit_consumer,
2220 dag_state.clone(),
2221 transaction_certifier.clone(),
2222 leader_schedule.clone(),
2223 )
2224 .await;
2225
2226 dag_state.write().flush();
2228
2229 let last_commit = store.read_last_commit().unwrap();
2231 assert!(last_commit.is_none());
2232 assert_eq!(dag_state.read().last_commit_index(), 0);
2233
2234 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2236 let (blocks_sender, _blocks_receiver) =
2237 monitored_mpsc::unbounded_channel("consensus_block_output");
2238 let transaction_certifier = TransactionCertifier::new(
2239 context.clone(),
2240 Arc::new(NoopBlockVerifier {}),
2241 dag_state.clone(),
2242 blocks_sender,
2243 );
2244 let _block_receiver = signal_receivers.block_broadcast_receiver();
2246 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
2247 let mut core = Core::new(
2248 context.clone(),
2249 leader_schedule,
2250 transaction_consumer,
2251 transaction_certifier.clone(),
2252 block_manager,
2253 commit_observer,
2254 signals,
2255 key_pairs.remove(context.own_index.value()).1,
2256 dag_state.clone(),
2257 true,
2258 round_tracker,
2259 );
2260 core.set_last_known_proposed_round(4);
2263
2264 let mut all_blocks = dag_builder.blocks(1..=11);
2270 all_blocks.sort_by_key(|b| b.round());
2271 let voted_blocks: Vec<(VerifiedBlock, Vec<TransactionIndex>)> =
2272 all_blocks.iter().map(|b| (b.clone(), vec![])).collect();
2273 transaction_certifier.add_voted_blocks(voted_blocks);
2274 let blocks: Vec<VerifiedBlock> = all_blocks
2275 .into_iter()
2276 .filter(|b| !(b.round() == 1 && b.author() == AuthorityIndex::new_for_test(3)))
2277 .collect();
2278 core.add_blocks(blocks).expect("Should not fail");
2279
2280 assert_eq!(core.last_proposed_round(), 12);
2281 }
2282
2283 #[tokio::test]
2284 async fn test_core_set_min_propose_round() {
2285 telemetry_subscribers::init_for_testing();
2286 let (context, mut key_pairs) = Context::new_for_test(4);
2287 let context = Arc::new(context.with_parameters(Parameters {
2288 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2289 ..Default::default()
2290 }));
2291
2292 let store = Arc::new(MemStore::new());
2293 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2294
2295 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2296 let leader_schedule = Arc::new(LeaderSchedule::from_store(
2297 context.clone(),
2298 dag_state.clone(),
2299 ));
2300
2301 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2302 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2303 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2304 let (blocks_sender, _blocks_receiver) =
2305 monitored_mpsc::unbounded_channel("consensus_block_output");
2306 let transaction_certifier = TransactionCertifier::new(
2307 context.clone(),
2308 Arc::new(NoopBlockVerifier {}),
2309 dag_state.clone(),
2310 blocks_sender,
2311 );
2312 let _block_receiver = signal_receivers.block_broadcast_receiver();
2314
2315 let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
2316 let commit_observer = CommitObserver::new(
2317 context.clone(),
2318 commit_consumer,
2319 dag_state.clone(),
2320 transaction_certifier.clone(),
2321 leader_schedule.clone(),
2322 )
2323 .await;
2324
2325 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
2326 let mut core = Core::new(
2327 context.clone(),
2328 leader_schedule,
2329 transaction_consumer,
2330 transaction_certifier.clone(),
2331 block_manager,
2332 commit_observer,
2333 signals,
2334 key_pairs.remove(context.own_index.value()).1,
2335 dag_state.clone(),
2336 true,
2337 round_tracker,
2338 );
2339
2340 assert_eq!(
2342 core.last_proposed_round(),
2343 GENESIS_ROUND,
2344 "No block should have been created other than genesis"
2345 );
2346
2347 assert!(core.try_propose(true).unwrap().is_none());
2349
2350 let mut builder = DagBuilder::new(context.clone());
2352 builder.layers(1..=10).build();
2353
2354 let blocks = builder.blocks.values().cloned().collect::<Vec<_>>();
2355
2356 transaction_certifier
2358 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2359 assert!(core.add_blocks(blocks).unwrap().is_empty());
2360
2361 core.round_tracker.write().update_from_probe(
2362 vec![
2363 vec![10, 10, 10, 10],
2364 vec![10, 10, 10, 10],
2365 vec![10, 10, 10, 10],
2366 vec![10, 10, 10, 10],
2367 ],
2368 vec![
2369 vec![10, 10, 10, 10],
2370 vec![10, 10, 10, 10],
2371 vec![10, 10, 10, 10],
2372 vec![10, 10, 10, 10],
2373 ],
2374 );
2375
2376 assert!(core.try_propose(true).unwrap().is_none());
2378
2379 core.set_last_known_proposed_round(10);
2382
2383 let block = core.try_propose(true).expect("No error").unwrap();
2384 assert_eq!(block.round(), 11);
2385 assert_eq!(block.ancestors().len(), 4);
2386
2387 let our_ancestor_included = block.ancestors()[0];
2388 assert_eq!(our_ancestor_included.author, context.own_index);
2389 assert_eq!(our_ancestor_included.round, 10);
2390 }
2391
2392 #[tokio::test(flavor = "current_thread", start_paused = true)]
2393 async fn test_core_try_new_block_leader_timeout() {
2394 telemetry_subscribers::init_for_testing();
2395
2396 async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
2403 let now = context.clock.timestamp_utc_ms();
2405 let max_timestamp = blocks
2406 .iter()
2407 .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
2408 .map(|block| block.timestamp_ms())
2409 .unwrap_or(0);
2410
2411 let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
2412 sleep(wait_time).await;
2413 }
2414
2415 let (context, _) = Context::new_for_test(4);
2416 let mut all_cores = create_cores(context, vec![1, 1, 1, 1]).await;
2418
2419 let (_last_core, cores) = all_cores.split_last_mut().unwrap();
2424
2425 let mut last_round_blocks = Vec::<VerifiedBlock>::new();
2427 for round in 1..=3 {
2428 let mut this_round_blocks = Vec::new();
2429
2430 for core_fixture in cores.iter_mut() {
2431 wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2432
2433 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
2434
2435 if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2437 assert_eq!(round - 1, r);
2438 if core_fixture.core.last_proposed_round() == r {
2439 core_fixture
2441 .core
2442 .try_propose(true)
2443 .unwrap()
2444 .unwrap_or_else(|| {
2445 panic!("Block should have been proposed for round {}", round)
2446 });
2447 }
2448 }
2449
2450 assert_eq!(core_fixture.core.last_proposed_round(), round);
2451
2452 this_round_blocks.push(core_fixture.core.last_proposed_block());
2453 }
2454
2455 last_round_blocks = this_round_blocks;
2456 }
2457
2458 for core_fixture in cores.iter_mut() {
2461 wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2462
2463 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
2464 assert!(core_fixture.core.try_propose(false).unwrap().is_none());
2465 }
2466
2467 for core_fixture in cores.iter_mut() {
2470 assert!(core_fixture.core.new_block(4, true).unwrap().is_some());
2471 assert_eq!(core_fixture.core.last_proposed_round(), 4);
2472
2473 core_fixture.dag_state.write().flush();
2475
2476 let last_commit = core_fixture
2478 .store
2479 .read_last_commit()
2480 .unwrap()
2481 .expect("last commit should be set");
2482 assert_eq!(last_commit.index(), 1);
2485 let all_stored_commits = core_fixture
2486 .store
2487 .scan_commits((0..=CommitIndex::MAX).into())
2488 .unwrap();
2489 assert_eq!(all_stored_commits.len(), 1);
2490 }
2491 }
2492
2493 #[tokio::test(flavor = "current_thread", start_paused = true)]
2494 async fn test_core_try_new_block_with_leader_timeout_and_low_scoring_authority() {
2495 telemetry_subscribers::init_for_testing();
2496
2497 async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
2504 let now = context.clock.timestamp_utc_ms();
2506 let max_timestamp = blocks
2507 .iter()
2508 .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
2509 .map(|block| block.timestamp_ms())
2510 .unwrap_or(0);
2511
2512 let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
2513 sleep(wait_time).await;
2514 }
2515
2516 let (mut context, _) = Context::new_for_test(5);
2517 context
2518 .protocol_config
2519 .set_consensus_bad_nodes_stake_threshold_for_testing(33);
2520
2521 let mut all_cores = create_cores(context, vec![1, 1, 1, 1, 1]).await;
2523 let (_last_core, cores) = all_cores.split_last_mut().unwrap();
2524
2525 let mut last_round_blocks = Vec::<VerifiedBlock>::new();
2527 for round in 1..=30 {
2528 let mut this_round_blocks = Vec::new();
2529
2530 for core_fixture in cores.iter_mut() {
2531 wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2532
2533 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
2534
2535 core_fixture.core.round_tracker.write().update_from_probe(
2536 vec![
2537 vec![round, round, round, round, 0],
2538 vec![round, round, round, round, 0],
2539 vec![round, round, round, round, 0],
2540 vec![round, round, round, round, 0],
2541 vec![0, 0, 0, 0, 0],
2542 ],
2543 vec![
2544 vec![round, round, round, round, 0],
2545 vec![round, round, round, round, 0],
2546 vec![round, round, round, round, 0],
2547 vec![round, round, round, round, 0],
2548 vec![0, 0, 0, 0, 0],
2549 ],
2550 );
2551
2552 if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2554 assert_eq!(round - 1, r);
2555 if core_fixture.core.last_proposed_round() == r {
2556 core_fixture
2558 .core
2559 .try_propose(true)
2560 .unwrap()
2561 .unwrap_or_else(|| {
2562 panic!("Block should have been proposed for round {}", round)
2563 });
2564 }
2565 }
2566
2567 assert_eq!(core_fixture.core.last_proposed_round(), round);
2568
2569 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2570 }
2571
2572 last_round_blocks = this_round_blocks;
2573 }
2574
2575 for round in 31..=40 {
2577 let mut this_round_blocks = Vec::new();
2578
2579 for core_fixture in all_cores.iter_mut() {
2580 wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2581
2582 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
2583
2584 core_fixture.core.round_tracker.write().update_from_probe(
2587 vec![
2588 vec![round, round, round, round, 0],
2589 vec![round, round, round, round, 0],
2590 vec![round, round, round, round, 0],
2591 vec![round, round, round, round, 0],
2592 vec![0, 0, 0, 0, 0],
2593 ],
2594 vec![
2595 vec![round, round, round, round, 0],
2596 vec![round, round, round, round, 0],
2597 vec![round, round, round, round, 0],
2598 vec![round, round, round, round, 0],
2599 vec![0, 0, 0, 0, 0],
2600 ],
2601 );
2602
2603 if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2605 assert_eq!(round - 1, r);
2606 if core_fixture.core.last_proposed_round() == r {
2607 core_fixture
2609 .core
2610 .try_propose(true)
2611 .unwrap()
2612 .unwrap_or_else(|| {
2613 panic!("Block should have been proposed for round {}", round)
2614 });
2615 }
2616 }
2617
2618 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2619
2620 for block in this_round_blocks.iter() {
2621 if block.author() != AuthorityIndex::new_for_test(4) {
2622 assert_eq!(block.ancestors().len(), 4);
2625 } else {
2626 assert_eq!(block.ancestors().len(), 5);
2629 }
2630 }
2631 }
2632
2633 last_round_blocks = this_round_blocks;
2634 }
2635 }
2636
2637 #[tokio::test]
2638 async fn test_smart_ancestor_selection() {
2639 telemetry_subscribers::init_for_testing();
2640 let (mut context, mut key_pairs) = Context::new_for_test(7);
2641 context
2642 .protocol_config
2643 .set_consensus_bad_nodes_stake_threshold_for_testing(33);
2644 let context = Arc::new(context.with_parameters(Parameters {
2645 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2646 ..Default::default()
2647 }));
2648
2649 let store = Arc::new(MemStore::new());
2650 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2651
2652 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2653 let leader_schedule = Arc::new(
2654 LeaderSchedule::from_store(context.clone(), dag_state.clone())
2655 .with_num_commits_per_schedule(10),
2656 );
2657
2658 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2659 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2660 let (blocks_sender, _blocks_receiver) =
2661 monitored_mpsc::unbounded_channel("consensus_block_output");
2662 let transaction_certifier = TransactionCertifier::new(
2663 context.clone(),
2664 Arc::new(NoopBlockVerifier {}),
2665 dag_state.clone(),
2666 blocks_sender,
2667 );
2668 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2669 let mut block_receiver = signal_receivers.block_broadcast_receiver();
2671
2672 let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
2673 let commit_observer = CommitObserver::new(
2674 context.clone(),
2675 commit_consumer,
2676 dag_state.clone(),
2677 transaction_certifier.clone(),
2678 leader_schedule.clone(),
2679 )
2680 .await;
2681
2682 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
2683 let mut core = Core::new(
2684 context.clone(),
2685 leader_schedule,
2686 transaction_consumer,
2687 transaction_certifier.clone(),
2688 block_manager,
2689 commit_observer,
2690 signals,
2691 key_pairs.remove(context.own_index.value()).1,
2692 dag_state.clone(),
2693 true,
2694 round_tracker.clone(),
2695 );
2696
2697 assert_eq!(
2699 core.last_proposed_round(),
2700 GENESIS_ROUND,
2701 "No block should have been created other than genesis"
2702 );
2703
2704 assert!(core.try_propose(true).unwrap().is_none());
2706
2707 let mut builder = DagBuilder::new(context.clone());
2709 builder
2710 .layers(1..=12)
2711 .authorities(vec![AuthorityIndex::new_for_test(1)])
2712 .skip_block()
2713 .build();
2714 let blocks = builder.blocks(1..=12);
2715 transaction_certifier
2717 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2718 assert!(core.add_blocks(blocks).unwrap().is_empty());
2719 core.set_last_known_proposed_round(12);
2720
2721 round_tracker.write().update_from_probe(
2722 vec![
2723 vec![12, 12, 12, 12, 12, 12, 12],
2724 vec![0, 0, 0, 0, 0, 0, 0],
2725 vec![12, 12, 12, 12, 12, 12, 12],
2726 vec![12, 12, 12, 12, 12, 12, 12],
2727 vec![12, 12, 12, 12, 12, 12, 12],
2728 vec![12, 12, 12, 12, 12, 12, 12],
2729 vec![12, 12, 12, 12, 12, 12, 12],
2730 ],
2731 vec![
2732 vec![12, 12, 12, 12, 12, 12, 12],
2733 vec![0, 0, 0, 0, 0, 0, 0],
2734 vec![12, 12, 12, 12, 12, 12, 12],
2735 vec![12, 12, 12, 12, 12, 12, 12],
2736 vec![12, 12, 12, 12, 12, 12, 12],
2737 vec![12, 12, 12, 12, 12, 12, 12],
2738 vec![12, 12, 12, 12, 12, 12, 12],
2739 ],
2740 );
2741
2742 let block = core.try_propose(true).expect("No error").unwrap();
2743 assert_eq!(block.round(), 13);
2744 assert_eq!(block.ancestors().len(), 7);
2745
2746 builder
2748 .layers(13..=14)
2749 .authorities(vec![AuthorityIndex::new_for_test(0)])
2750 .skip_block()
2751 .build();
2752 let blocks = builder.blocks(13..=14);
2753 transaction_certifier
2754 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2755 assert!(core.add_blocks(blocks).unwrap().is_empty());
2756
2757 let block = core.try_propose(true).expect("No error").unwrap();
2760 assert_eq!(block.round(), 15);
2761 assert_eq!(block.ancestors().len(), 6);
2762
2763 let round_14_ancestors = builder.last_ancestors.clone();
2766 builder
2767 .layer(15)
2768 .authorities(vec![
2769 AuthorityIndex::new_for_test(0),
2770 AuthorityIndex::new_for_test(5),
2771 AuthorityIndex::new_for_test(6),
2772 ])
2773 .skip_block()
2774 .build();
2775 let blocks = builder.blocks(15..=15);
2776 let authority_1_excluded_block_reference = blocks
2777 .iter()
2778 .find(|block| block.author() == AuthorityIndex::new_for_test(1))
2779 .unwrap()
2780 .reference();
2781 sleep(context.parameters.min_round_delay).await;
2783 transaction_certifier
2785 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2786 assert!(core.add_blocks(blocks).unwrap().is_empty());
2787 assert_eq!(core.last_proposed_block().round(), 15);
2788
2789 builder
2790 .layer(15)
2791 .authorities(vec![
2792 AuthorityIndex::new_for_test(0),
2793 AuthorityIndex::new_for_test(1),
2794 AuthorityIndex::new_for_test(2),
2795 AuthorityIndex::new_for_test(3),
2796 AuthorityIndex::new_for_test(4),
2797 ])
2798 .skip_block()
2799 .override_last_ancestors(round_14_ancestors)
2800 .build();
2801 let blocks = builder.blocks(15..=15);
2802 let round_15_ancestors: Vec<BlockRef> = blocks
2803 .iter()
2804 .filter(|block| block.round() == 15)
2805 .map(|block| block.reference())
2806 .collect();
2807 let included_block_references = iter::once(&core.last_proposed_block())
2808 .chain(blocks.iter())
2809 .filter(|block| block.author() != AuthorityIndex::new_for_test(1))
2810 .map(|block| block.reference())
2811 .collect::<Vec<_>>();
2812
2813 transaction_certifier
2815 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2816 assert!(core.add_blocks(blocks).unwrap().is_empty());
2817 assert_eq!(core.last_proposed_block().round(), 16);
2818
2819 let extended_block = loop {
2821 let extended_block =
2822 tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2823 .await
2824 .unwrap()
2825 .unwrap();
2826 if extended_block.block.round() == 16 {
2827 break extended_block;
2828 }
2829 };
2830 assert_eq!(extended_block.block.round(), 16);
2831 assert_eq!(extended_block.block.author(), core.context.own_index);
2832 assert_eq!(extended_block.block.ancestors().len(), 6);
2833 assert_eq!(extended_block.block.ancestors(), included_block_references);
2834 assert_eq!(extended_block.excluded_ancestors.len(), 1);
2835 assert_eq!(
2836 extended_block.excluded_ancestors[0],
2837 authority_1_excluded_block_reference
2838 );
2839
2840 builder
2845 .layer(16)
2846 .authorities(vec![
2847 AuthorityIndex::new_for_test(0),
2848 AuthorityIndex::new_for_test(5),
2849 AuthorityIndex::new_for_test(6),
2850 ])
2851 .skip_block()
2852 .override_last_ancestors(round_15_ancestors)
2853 .build();
2854 let blocks = builder.blocks(16..=16);
2855 sleep(context.parameters.min_round_delay).await;
2857 transaction_certifier
2859 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2860 assert!(core.add_blocks(blocks).unwrap().is_empty());
2861 assert_eq!(core.last_proposed_block().round(), 16);
2862
2863 let block = core.try_propose(true).expect("No error").unwrap();
2866 assert_eq!(block.round(), 17);
2867 assert_eq!(block.ancestors().len(), 5);
2868
2869 let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2871 .await
2872 .unwrap()
2873 .unwrap();
2874 assert_eq!(extended_block.block.round(), 17);
2875 assert_eq!(extended_block.block.author(), core.context.own_index);
2876 assert_eq!(extended_block.block.ancestors().len(), 5);
2877 assert_eq!(extended_block.excluded_ancestors.len(), 0);
2878
2879 builder
2882 .layers(17..=22)
2883 .authorities(vec![AuthorityIndex::new_for_test(0)])
2884 .skip_block()
2885 .build();
2886 let blocks = builder.blocks(17..=22);
2887
2888 round_tracker.write().update_from_probe(
2894 vec![
2895 vec![22, 22, 22, 22, 22, 22, 22],
2896 vec![22, 22, 22, 22, 22, 22, 22],
2897 vec![22, 22, 22, 22, 22, 22, 22],
2898 vec![22, 22, 22, 22, 22, 22, 22],
2899 vec![22, 22, 22, 22, 22, 22, 22],
2900 vec![22, 22, 22, 22, 22, 22, 22],
2901 vec![22, 22, 22, 22, 22, 22, 22],
2902 ],
2903 vec![
2904 vec![22, 22, 22, 22, 22, 22, 22],
2905 vec![22, 22, 22, 22, 22, 22, 22],
2906 vec![22, 22, 22, 22, 22, 22, 22],
2907 vec![22, 22, 22, 22, 22, 22, 22],
2908 vec![22, 22, 22, 22, 22, 22, 22],
2909 vec![22, 22, 22, 22, 22, 22, 22],
2910 vec![22, 22, 22, 22, 22, 22, 22],
2911 ],
2912 );
2913
2914 let included_block_references = iter::once(&core.last_proposed_block())
2915 .chain(blocks.iter())
2916 .filter(|block| block.round() == 22 || block.author() == core.context.own_index)
2917 .map(|block| block.reference())
2918 .collect::<Vec<_>>();
2919
2920 sleep(context.parameters.min_round_delay).await;
2922 transaction_certifier
2923 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2924 assert!(core.add_blocks(blocks).unwrap().is_empty());
2925 assert_eq!(core.last_proposed_block().round(), 23);
2926
2927 let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2929 .await
2930 .unwrap()
2931 .unwrap();
2932 assert_eq!(extended_block.block.round(), 23);
2933 assert_eq!(extended_block.block.author(), core.context.own_index);
2934 assert_eq!(extended_block.block.ancestors().len(), 7);
2935 assert_eq!(extended_block.block.ancestors(), included_block_references);
2936 assert_eq!(extended_block.excluded_ancestors.len(), 0);
2937 }
2938
2939 #[tokio::test]
2940 async fn test_excluded_ancestor_limit() {
2941 telemetry_subscribers::init_for_testing();
2942 let (context, mut key_pairs) = Context::new_for_test(4);
2943 let context = Arc::new(context.with_parameters(Parameters {
2944 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2945 ..Default::default()
2946 }));
2947
2948 let store = Arc::new(MemStore::new());
2949 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2950
2951 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2952 let leader_schedule = Arc::new(
2953 LeaderSchedule::from_store(context.clone(), dag_state.clone())
2954 .with_num_commits_per_schedule(10),
2955 );
2956
2957 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2958 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2959 let (blocks_sender, _blocks_receiver) =
2960 monitored_mpsc::unbounded_channel("consensus_block_output");
2961 let transaction_certifier = TransactionCertifier::new(
2962 context.clone(),
2963 Arc::new(NoopBlockVerifier {}),
2964 dag_state.clone(),
2965 blocks_sender,
2966 );
2967 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2968 let mut block_receiver = signal_receivers.block_broadcast_receiver();
2970
2971 let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
2972 let commit_observer = CommitObserver::new(
2973 context.clone(),
2974 commit_consumer,
2975 dag_state.clone(),
2976 transaction_certifier.clone(),
2977 leader_schedule.clone(),
2978 )
2979 .await;
2980
2981 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
2982 let mut core = Core::new(
2983 context.clone(),
2984 leader_schedule,
2985 transaction_consumer,
2986 transaction_certifier.clone(),
2987 block_manager,
2988 commit_observer,
2989 signals,
2990 key_pairs.remove(context.own_index.value()).1,
2991 dag_state.clone(),
2992 true,
2993 round_tracker,
2994 );
2995
2996 assert_eq!(
2998 core.last_proposed_round(),
2999 GENESIS_ROUND,
3000 "No block should have been created other than genesis"
3001 );
3002
3003 let mut builder = DagBuilder::new(context.clone());
3005 builder.layers(1..=3).build();
3006
3007 builder
3011 .layer(4)
3012 .authorities(vec![AuthorityIndex::new_for_test(1)])
3013 .equivocate(9)
3014 .build();
3015 let blocks = builder.blocks(1..=4);
3016
3017 transaction_certifier
3019 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
3020 assert!(core.add_blocks(blocks).unwrap().is_empty());
3021 core.set_last_known_proposed_round(3);
3022
3023 let block = core.try_propose(true).expect("No error").unwrap();
3024 assert_eq!(block.round(), 5);
3025 assert_eq!(block.ancestors().len(), 4);
3026
3027 let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
3029 .await
3030 .unwrap()
3031 .unwrap();
3032 assert_eq!(extended_block.block.round(), 5);
3033 assert_eq!(extended_block.block.author(), core.context.own_index);
3034 assert_eq!(extended_block.block.ancestors().len(), 4);
3035 assert_eq!(extended_block.excluded_ancestors.len(), 8);
3036 }
3037
3038 #[tokio::test]
3039 async fn test_core_set_propagation_delay_per_authority() {
3040 telemetry_subscribers::init_for_testing();
3042 let (context, mut key_pairs) = Context::new_for_test(4);
3043 let context = Arc::new(context);
3044 let store = Arc::new(MemStore::new());
3045 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3046
3047 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
3048 let leader_schedule = Arc::new(LeaderSchedule::from_store(
3049 context.clone(),
3050 dag_state.clone(),
3051 ));
3052
3053 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3054 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3055 let (signals, signal_receivers) = CoreSignals::new(context.clone());
3056 let (blocks_sender, _blocks_receiver) =
3057 monitored_mpsc::unbounded_channel("consensus_block_output");
3058 let transaction_certifier = TransactionCertifier::new(
3059 context.clone(),
3060 Arc::new(NoopBlockVerifier {}),
3061 dag_state.clone(),
3062 blocks_sender,
3063 );
3064 let _block_receiver = signal_receivers.block_broadcast_receiver();
3066
3067 let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
3068 let commit_observer = CommitObserver::new(
3069 context.clone(),
3070 commit_consumer,
3071 dag_state.clone(),
3072 transaction_certifier.clone(),
3073 leader_schedule.clone(),
3074 )
3075 .await;
3076
3077 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
3078 let mut core = Core::new(
3079 context.clone(),
3080 leader_schedule,
3081 transaction_consumer,
3082 transaction_certifier.clone(),
3083 block_manager,
3084 commit_observer,
3085 signals,
3086 key_pairs.remove(context.own_index.value()).1,
3087 dag_state.clone(),
3088 false,
3089 round_tracker.clone(),
3090 );
3091
3092 let test_block = VerifiedBlock::new_for_test(TestBlock::new(1000, 0).build());
3097 transaction_certifier.add_voted_blocks(vec![(test_block.clone(), vec![])]);
3098 dag_state.write().accept_block(test_block);
3100
3101 round_tracker.write().update_from_probe(
3102 vec![
3103 vec![0, 0, 0, 0],
3104 vec![0, 0, 0, 0],
3105 vec![0, 0, 0, 0],
3106 vec![0, 0, 0, 0],
3107 ],
3108 vec![
3109 vec![0, 0, 0, 0],
3110 vec![0, 0, 0, 0],
3111 vec![0, 0, 0, 0],
3112 vec![0, 0, 0, 0],
3113 ],
3114 );
3115
3116 assert!(core.try_propose(true).unwrap().is_none());
3118
3119 round_tracker.write().update_from_probe(
3123 vec![
3124 vec![1000, 1000, 1000, 1000],
3125 vec![1000, 1000, 1000, 1000],
3126 vec![1000, 1000, 1000, 1000],
3127 vec![1000, 1000, 1000, 1000],
3128 ],
3129 vec![
3130 vec![1000, 1000, 1000, 1000],
3131 vec![1000, 1000, 1000, 1000],
3132 vec![1000, 1000, 1000, 1000],
3133 vec![1000, 1000, 1000, 1000],
3134 ],
3135 );
3136
3137 for author in 1..4 {
3140 let block = VerifiedBlock::new_for_test(TestBlock::new(1000, author).build());
3141 transaction_certifier.add_voted_blocks(vec![(block.clone(), vec![])]);
3142 dag_state.write().accept_block(block);
3144 }
3145
3146 assert!(core.try_propose(true).unwrap().is_some());
3148 }
3149
3150 #[tokio::test(flavor = "current_thread", start_paused = true)]
3151 async fn test_leader_schedule_change() {
3152 telemetry_subscribers::init_for_testing();
3153 let default_params = Parameters::default();
3154
3155 let (context, _) = Context::new_for_test(4);
3156 let mut cores = create_cores(context, vec![1, 1, 1, 1]).await;
3158
3159 let mut last_round_blocks = Vec::new();
3161 for round in 1..=30 {
3162 let mut this_round_blocks = Vec::new();
3163
3164 sleep(default_params.min_round_delay).await;
3166
3167 for core_fixture in &mut cores {
3168 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
3171
3172 core_fixture.core.round_tracker.write().update_from_probe(
3173 vec![
3174 vec![round, round, round, round],
3175 vec![round, round, round, round],
3176 vec![round, round, round, round],
3177 vec![round, round, round, round],
3178 ],
3179 vec![
3180 vec![round, round, round, round],
3181 vec![round, round, round, round],
3182 vec![round, round, round, round],
3183 vec![round, round, round, round],
3184 ],
3185 );
3186
3187 let new_round = receive(
3189 Duration::from_secs(1),
3190 core_fixture.signal_receivers.new_round_receiver(),
3191 )
3192 .await;
3193 assert_eq!(new_round, round);
3194
3195 let extended_block = tokio::time::timeout(
3197 Duration::from_secs(1),
3198 core_fixture.block_receiver.recv(),
3199 )
3200 .await
3201 .unwrap()
3202 .unwrap();
3203 assert_eq!(extended_block.block.round(), round);
3204 assert_eq!(
3205 extended_block.block.author(),
3206 core_fixture.core.context.own_index
3207 );
3208
3209 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3211
3212 let block = core_fixture.core.last_proposed_block();
3213
3214 assert_eq!(
3216 block.ancestors().len(),
3217 core_fixture.core.context.committee.size()
3218 );
3219 for ancestor in block.ancestors() {
3220 if block.round() > 1 {
3221 assert!(
3223 last_round_blocks
3224 .iter()
3225 .any(|block| block.reference() == *ancestor),
3226 "Reference from previous round should be added"
3227 );
3228 }
3229 }
3230 }
3231
3232 last_round_blocks = this_round_blocks;
3233 }
3234
3235 for core_fixture in cores {
3236 core_fixture.dag_state.write().flush();
3238
3239 let last_commit = core_fixture
3241 .store
3242 .read_last_commit()
3243 .unwrap()
3244 .expect("last commit should be set");
3245 assert_eq!(last_commit.index(), 27);
3249 let all_stored_commits = core_fixture
3250 .store
3251 .scan_commits((0..=CommitIndex::MAX).into())
3252 .unwrap();
3253 assert_eq!(all_stored_commits.len(), 27);
3254 assert_eq!(
3255 core_fixture
3256 .core
3257 .leader_schedule
3258 .leader_swap_table
3259 .read()
3260 .bad_nodes
3261 .len(),
3262 1
3263 );
3264 assert_eq!(
3265 core_fixture
3266 .core
3267 .leader_schedule
3268 .leader_swap_table
3269 .read()
3270 .good_nodes
3271 .len(),
3272 1
3273 );
3274 let expected_reputation_scores =
3275 ReputationScores::new((11..=20).into(), vec![29, 29, 29, 29]);
3276 assert_eq!(
3277 core_fixture
3278 .core
3279 .leader_schedule
3280 .leader_swap_table
3281 .read()
3282 .reputation_scores,
3283 expected_reputation_scores
3284 );
3285 }
3286 }
3287
3288 #[tokio::test]
3289 async fn test_filter_new_commits() {
3290 telemetry_subscribers::init_for_testing();
3291
3292 let (context, _key_pairs) = Context::new_for_test(4);
3293 let context = context.with_parameters(Parameters {
3294 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3295 ..Default::default()
3296 });
3297
3298 let authority_index = AuthorityIndex::new_for_test(0);
3299 let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true).await;
3300 let mut core = core.core;
3301
3302 assert_eq!(
3304 core.last_proposed_round(),
3305 GENESIS_ROUND,
3306 "No block should have been created other than genesis"
3307 );
3308
3309 let mut dag_builder = DagBuilder::new(core.context.clone());
3311 dag_builder.layers(1..=12).build();
3312
3313 dag_builder.print();
3315 let blocks = dag_builder.blocks(1..=6);
3316
3317 for block in blocks {
3318 core.dag_state.write().accept_block(block);
3319 }
3320
3321 let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
3323
3324 let committed_sub_dags = core.try_commit(vec![]).unwrap();
3326
3327 assert_eq!(committed_sub_dags.len(), 4);
3329
3330 println!("Case 1. Provide certified commits that are all before the last committed round.");
3332
3333 let certified_commits = sub_dags_and_commits
3335 .iter()
3336 .take(4)
3337 .map(|(_, c)| c)
3338 .cloned()
3339 .collect::<Vec<_>>();
3340 assert!(
3341 certified_commits.last().unwrap().index()
3342 <= committed_sub_dags.last().unwrap().commit_ref.index,
3343 "Highest certified commit should older than the highest committed index."
3344 );
3345
3346 let certified_commits = core.filter_new_commits(certified_commits).unwrap();
3347
3348 assert!(certified_commits.is_empty());
3350
3351 println!("Case 2. Provide certified commits that are all after the last committed round.");
3352
3353 let certified_commits = sub_dags_and_commits
3355 .iter()
3356 .take(5)
3357 .map(|(_, c)| c.clone())
3358 .collect::<Vec<_>>();
3359
3360 let certified_commits = core.filter_new_commits(certified_commits.clone()).unwrap();
3361
3362 assert_eq!(certified_commits.len(), 1);
3364 assert_eq!(certified_commits.first().unwrap().reference().index, 5);
3365
3366 println!(
3367 "Case 3. Provide certified commits where the first certified commit index is not the last_commited_index + 1."
3368 );
3369
3370 let certified_commits = sub_dags_and_commits
3372 .iter()
3373 .skip(5)
3374 .take(1)
3375 .map(|(_, c)| c.clone())
3376 .collect::<Vec<_>>();
3377
3378 let err = core
3379 .filter_new_commits(certified_commits.clone())
3380 .unwrap_err();
3381 match err {
3382 ConsensusError::UnexpectedCertifiedCommitIndex {
3383 expected_commit_index: 5,
3384 commit_index: 6,
3385 } => (),
3386 _ => panic!("Unexpected error: {:?}", err),
3387 }
3388 }
3389
3390 #[tokio::test]
3391 async fn test_add_certified_commits() {
3392 telemetry_subscribers::init_for_testing();
3393
3394 let (context, _key_pairs) = Context::new_for_test(4);
3395 let context = context.with_parameters(Parameters {
3396 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3397 ..Default::default()
3398 });
3399
3400 let authority_index = AuthorityIndex::new_for_test(0);
3401 let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true).await;
3402 let store = core.store.clone();
3403 let mut core = core.core;
3404
3405 assert_eq!(
3407 core.last_proposed_round(),
3408 GENESIS_ROUND,
3409 "No block should have been created other than genesis"
3410 );
3411
3412 let mut dag_builder = DagBuilder::new(core.context.clone());
3414 dag_builder.layers(1..=12).build();
3415
3416 dag_builder.print();
3418 let blocks = dag_builder.blocks(1..=6);
3419
3420 for block in blocks {
3421 core.dag_state.write().accept_block(block);
3422 }
3423
3424 let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
3426
3427 let committed_sub_dags = core.try_commit(vec![]).unwrap();
3429
3430 assert_eq!(committed_sub_dags.len(), 4);
3432
3433 core.dag_state.write().flush();
3435
3436 println!("Case 1. Provide no certified commits. No commit should happen.");
3437
3438 let last_commit = store
3439 .read_last_commit()
3440 .unwrap()
3441 .expect("Last commit should be set");
3442 assert_eq!(last_commit.reference().index, 4);
3443
3444 println!(
3445 "Case 2. Provide certified commits that before and after the last committed round and also there are additional blocks so can run the direct decide rule as well."
3446 );
3447
3448 let certified_commits = sub_dags_and_commits
3450 .iter()
3451 .skip(3)
3452 .take(5)
3453 .map(|(_, c)| c.clone())
3454 .collect::<Vec<_>>();
3455
3456 let blocks = dag_builder.blocks(8..=12);
3458 for block in blocks {
3459 core.dag_state.write().accept_block(block);
3460 }
3461
3462 core.add_certified_commits(CertifiedCommits::new(certified_commits.clone(), vec![]))
3464 .expect("Should not fail");
3465
3466 core.dag_state.write().flush();
3468
3469 let commits = store.scan_commits((6..=10).into()).unwrap();
3470
3471 assert_eq!(commits.len(), 5);
3473
3474 for i in 6..=10 {
3475 let commit = &commits[i - 6];
3476 assert_eq!(commit.reference().index, i as u32);
3477 }
3478 }
3479
3480 #[tokio::test]
3481 async fn try_commit_with_certified_commits_gced_blocks() {
3482 const GC_DEPTH: u32 = 3;
3483 telemetry_subscribers::init_for_testing();
3484
3485 let (mut context, mut key_pairs) = Context::new_for_test(5);
3486 context
3487 .protocol_config
3488 .set_consensus_gc_depth_for_testing(GC_DEPTH);
3489 let context = Arc::new(context.with_parameters(Parameters {
3490 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3491 ..Default::default()
3492 }));
3493
3494 let store = Arc::new(MemStore::new());
3495 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3496
3497 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
3498 let leader_schedule = Arc::new(
3499 LeaderSchedule::from_store(context.clone(), dag_state.clone())
3500 .with_num_commits_per_schedule(10),
3501 );
3502
3503 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3504 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3505 let (signals, signal_receivers) = CoreSignals::new(context.clone());
3506 let (blocks_sender, _blocks_receiver) =
3507 monitored_mpsc::unbounded_channel("consensus_block_output");
3508 let transaction_certifier = TransactionCertifier::new(
3509 context.clone(),
3510 Arc::new(NoopBlockVerifier {}),
3511 dag_state.clone(),
3512 blocks_sender,
3513 );
3514 let _block_receiver = signal_receivers.block_broadcast_receiver();
3516
3517 let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
3518 let commit_observer = CommitObserver::new(
3519 context.clone(),
3520 commit_consumer,
3521 dag_state.clone(),
3522 transaction_certifier.clone(),
3523 leader_schedule.clone(),
3524 )
3525 .await;
3526
3527 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
3528 let mut core = Core::new(
3529 context.clone(),
3530 leader_schedule,
3531 transaction_consumer,
3532 transaction_certifier.clone(),
3533 block_manager,
3534 commit_observer,
3535 signals,
3536 key_pairs.remove(context.own_index.value()).1,
3537 dag_state.clone(),
3538 true,
3539 round_tracker,
3540 );
3541
3542 assert_eq!(
3544 core.last_proposed_round(),
3545 GENESIS_ROUND,
3546 "No block should have been created other than genesis"
3547 );
3548
3549 let dag_str = "DAG {
3550 Round 0 : { 5 },
3551 Round 1 : { * },
3552 Round 2 : {
3553 A -> [-E1],
3554 B -> [-E1],
3555 C -> [-E1],
3556 D -> [-E1],
3557 },
3558 Round 3 : {
3559 A -> [*],
3560 B -> [*],
3561 C -> [*],
3562 D -> [*],
3563 },
3564 Round 4 : {
3565 A -> [*],
3566 B -> [*],
3567 C -> [*],
3568 D -> [*],
3569 },
3570 Round 5 : {
3571 A -> [*],
3572 B -> [*],
3573 C -> [*],
3574 D -> [*],
3575 E -> [A4, B4, C4, D4, E1]
3576 },
3577 Round 6 : { * },
3578 Round 7 : { * },
3579 }";
3580
3581 let (_, mut dag_builder) = parse_dag(dag_str).expect("Invalid dag");
3582 dag_builder.print();
3583
3584 let (_sub_dags, certified_commits): (Vec<_>, Vec<_>) = dag_builder
3586 .get_sub_dag_and_certified_commits(1..=5)
3587 .into_iter()
3588 .unzip();
3589
3590 let committed_sub_dags = core.try_commit(certified_commits).unwrap();
3593
3594 assert_eq!(committed_sub_dags.len(), 4);
3596 for (index, committed_sub_dag) in committed_sub_dags.iter().enumerate() {
3597 assert_eq!(committed_sub_dag.commit_ref.index as usize, index + 1);
3598
3599 for block in committed_sub_dag.blocks.iter() {
3601 if block.round() == 1 && block.author() == AuthorityIndex::new_for_test(5) {
3602 panic!("Did not expect to commit block E1");
3603 }
3604 }
3605 }
3606 }
3607
3608 #[tokio::test(flavor = "current_thread", start_paused = true)]
3609 async fn test_commit_on_leader_schedule_change_boundary_without_multileader() {
3610 parameterized_test_commit_on_leader_schedule_change_boundary(Some(1)).await;
3611 }
3612
3613 #[tokio::test(flavor = "current_thread", start_paused = true)]
3614 async fn test_commit_on_leader_schedule_change_boundary_with_multileader() {
3615 parameterized_test_commit_on_leader_schedule_change_boundary(None).await;
3616 }
3617
3618 async fn parameterized_test_commit_on_leader_schedule_change_boundary(
3619 num_leaders_per_round: Option<usize>,
3620 ) {
3621 telemetry_subscribers::init_for_testing();
3622 let default_params = Parameters::default();
3623
3624 let (mut context, _) = Context::new_for_test(6);
3625 context
3626 .protocol_config
3627 .set_mysticeti_num_leaders_per_round_for_testing(num_leaders_per_round);
3628 let mut cores = create_cores(context, vec![1, 1, 1, 1, 1, 1]).await;
3630
3631 let mut last_round_blocks: Vec<VerifiedBlock> = Vec::new();
3633 for round in 1..=33 {
3634 let mut this_round_blocks = Vec::new();
3635
3636 sleep(default_params.min_round_delay).await;
3638
3639 for core_fixture in &mut cores {
3640 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
3643
3644 core_fixture.core.round_tracker.write().update_from_probe(
3645 vec![
3646 vec![round, round, round, round, round, round],
3647 vec![round, round, round, round, round, round],
3648 vec![round, round, round, round, round, round],
3649 vec![round, round, round, round, round, round],
3650 vec![round, round, round, round, round, round],
3651 vec![round, round, round, round, round, round],
3652 ],
3653 vec![
3654 vec![round, round, round, round, round, round],
3655 vec![round, round, round, round, round, round],
3656 vec![round, round, round, round, round, round],
3657 vec![round, round, round, round, round, round],
3658 vec![round, round, round, round, round, round],
3659 vec![round, round, round, round, round, round],
3660 ],
3661 );
3662
3663 let new_round = receive(
3665 Duration::from_secs(1),
3666 core_fixture.signal_receivers.new_round_receiver(),
3667 )
3668 .await;
3669 assert_eq!(new_round, round);
3670
3671 let extended_block = tokio::time::timeout(
3673 Duration::from_secs(1),
3674 core_fixture.block_receiver.recv(),
3675 )
3676 .await
3677 .unwrap()
3678 .unwrap();
3679 assert_eq!(extended_block.block.round(), round);
3680 assert_eq!(
3681 extended_block.block.author(),
3682 core_fixture.core.context.own_index
3683 );
3684
3685 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3687
3688 let block = core_fixture.core.last_proposed_block();
3689
3690 assert_eq!(
3692 block.ancestors().len(),
3693 core_fixture.core.context.committee.size()
3694 );
3695 for ancestor in block.ancestors() {
3696 if block.round() > 1 {
3697 assert!(
3699 last_round_blocks
3700 .iter()
3701 .any(|block| block.reference() == *ancestor),
3702 "Reference from previous round should be added"
3703 );
3704 }
3705 }
3706 }
3707
3708 last_round_blocks = this_round_blocks;
3709 }
3710
3711 for core_fixture in cores {
3712 let expected_commit_count = match num_leaders_per_round {
3724 Some(1) => 30,
3725 _ => 31,
3726 };
3727
3728 core_fixture.dag_state.write().flush();
3730
3731 let last_commit = core_fixture
3733 .store
3734 .read_last_commit()
3735 .unwrap()
3736 .expect("last commit should be set");
3737 assert_eq!(last_commit.index(), expected_commit_count);
3738 let all_stored_commits = core_fixture
3739 .store
3740 .scan_commits((0..=CommitIndex::MAX).into())
3741 .unwrap();
3742 assert_eq!(all_stored_commits.len(), expected_commit_count as usize);
3743 assert_eq!(
3744 core_fixture
3745 .core
3746 .leader_schedule
3747 .leader_swap_table
3748 .read()
3749 .bad_nodes
3750 .len(),
3751 1
3752 );
3753 assert_eq!(
3754 core_fixture
3755 .core
3756 .leader_schedule
3757 .leader_swap_table
3758 .read()
3759 .good_nodes
3760 .len(),
3761 1
3762 );
3763 let expected_reputation_scores =
3764 ReputationScores::new((21..=30).into(), vec![43, 43, 43, 43, 43, 43]);
3765 assert_eq!(
3766 core_fixture
3767 .core
3768 .leader_schedule
3769 .leader_swap_table
3770 .read()
3771 .reputation_scores,
3772 expected_reputation_scores
3773 );
3774 }
3775 }
3776
3777 #[tokio::test]
3778 async fn test_core_signals() {
3779 telemetry_subscribers::init_for_testing();
3780 let default_params = Parameters::default();
3781
3782 let (context, _) = Context::new_for_test(4);
3783 let mut cores = create_cores(context, vec![1, 1, 1, 1]).await;
3785
3786 let mut last_round_blocks = Vec::new();
3788 for round in 1..=10 {
3789 let mut this_round_blocks = Vec::new();
3790
3791 sleep(default_params.min_round_delay).await;
3793
3794 for core_fixture in &mut cores {
3795 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
3798
3799 core_fixture.core.round_tracker.write().update_from_probe(
3800 vec![
3801 vec![round, round, round, round],
3802 vec![round, round, round, round],
3803 vec![round, round, round, round],
3804 vec![round, round, round, round],
3805 ],
3806 vec![
3807 vec![round, round, round, round],
3808 vec![round, round, round, round],
3809 vec![round, round, round, round],
3810 vec![round, round, round, round],
3811 ],
3812 );
3813
3814 let new_round = receive(
3816 Duration::from_secs(1),
3817 core_fixture.signal_receivers.new_round_receiver(),
3818 )
3819 .await;
3820 assert_eq!(new_round, round);
3821
3822 let extended_block = tokio::time::timeout(
3824 Duration::from_secs(1),
3825 core_fixture.block_receiver.recv(),
3826 )
3827 .await
3828 .unwrap()
3829 .unwrap();
3830 assert_eq!(extended_block.block.round(), round);
3831 assert_eq!(
3832 extended_block.block.author(),
3833 core_fixture.core.context.own_index
3834 );
3835
3836 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3838
3839 let block = core_fixture.core.last_proposed_block();
3840
3841 assert_eq!(
3843 block.ancestors().len(),
3844 core_fixture.core.context.committee.size()
3845 );
3846 for ancestor in block.ancestors() {
3847 if block.round() > 1 {
3848 assert!(
3850 last_round_blocks
3851 .iter()
3852 .any(|block| block.reference() == *ancestor),
3853 "Reference from previous round should be added"
3854 );
3855 }
3856 }
3857 }
3858
3859 last_round_blocks = this_round_blocks;
3860 }
3861
3862 for core_fixture in cores {
3863 core_fixture.dag_state.write().flush();
3865 let last_commit = core_fixture
3867 .store
3868 .read_last_commit()
3869 .unwrap()
3870 .expect("last commit should be set");
3871 assert_eq!(last_commit.index(), 7);
3875 let all_stored_commits = core_fixture
3876 .store
3877 .scan_commits((0..=CommitIndex::MAX).into())
3878 .unwrap();
3879 assert_eq!(all_stored_commits.len(), 7);
3880 }
3881 }
3882
3883 #[tokio::test]
3884 async fn test_core_compress_proposal_references() {
3885 telemetry_subscribers::init_for_testing();
3886 let default_params = Parameters::default();
3887
3888 let (context, _) = Context::new_for_test(4);
3889 let mut cores = create_cores(context, vec![1, 1, 1, 1]).await;
3891
3892 let mut last_round_blocks = Vec::new();
3893 let mut all_blocks = Vec::new();
3894
3895 let excluded_authority = AuthorityIndex::new_for_test(3);
3896
3897 for round in 1..=10 {
3898 let mut this_round_blocks = Vec::new();
3899
3900 for core_fixture in &mut cores {
3901 if core_fixture.core.context.own_index == excluded_authority {
3903 continue;
3904 }
3905
3906 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
3908 core_fixture.core.round_tracker.write().update_from_probe(
3909 vec![
3910 vec![round, round, round, round],
3911 vec![round, round, round, round],
3912 vec![round, round, round, round],
3913 vec![round, round, round, round],
3914 ],
3915 vec![
3916 vec![round, round, round, round],
3917 vec![round, round, round, round],
3918 vec![round, round, round, round],
3919 vec![round, round, round, round],
3920 ],
3921 );
3922 core_fixture.core.new_block(round, true).unwrap();
3923
3924 let block = core_fixture.core.last_proposed_block();
3925 assert_eq!(block.round(), round);
3926
3927 this_round_blocks.push(block.clone());
3929 }
3930
3931 last_round_blocks = this_round_blocks.clone();
3932 all_blocks.extend(this_round_blocks);
3933 }
3934
3935 let core_fixture = &mut cores[excluded_authority];
3939 sleep(default_params.min_round_delay).await;
3941 core_fixture.add_blocks(all_blocks).unwrap();
3943
3944 let block = core_fixture.core.last_proposed_block();
3947 assert_eq!(block.round(), 11);
3948 assert_eq!(block.ancestors().len(), 4);
3949 for block_ref in block.ancestors() {
3950 if block_ref.author == excluded_authority {
3951 assert_eq!(block_ref.round, 1);
3952 } else {
3953 assert_eq!(block_ref.round, 10);
3954 }
3955 }
3956
3957 core_fixture.dag_state.write().flush();
3959
3960 let last_commit = core_fixture
3962 .store
3963 .read_last_commit()
3964 .unwrap()
3965 .expect("last commit should be set");
3966 assert_eq!(last_commit.index(), 6);
3970 let all_stored_commits = core_fixture
3971 .store
3972 .scan_commits((0..=CommitIndex::MAX).into())
3973 .unwrap();
3974 assert_eq!(all_stored_commits.len(), 6);
3975 }
3976
3977 #[tokio::test]
3978 async fn try_select_certified_leaders() {
3979 telemetry_subscribers::init_for_testing();
3981
3982 let (context, _) = Context::new_for_test(4);
3983
3984 let authority_index = AuthorityIndex::new_for_test(0);
3985 let core =
3986 CoreTextFixture::new(context.clone(), vec![1, 1, 1, 1], authority_index, true).await;
3987 let mut core = core.core;
3988
3989 let mut dag_builder = DagBuilder::new(Arc::new(context.clone()));
3990 dag_builder.layers(1..=12).build();
3991
3992 let limit = 2;
3993
3994 let blocks = dag_builder.blocks(1..=12);
3995
3996 for block in blocks {
3997 core.dag_state.write().accept_block(block);
3998 }
3999
4000 let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=4);
4002 let mut certified_commits = sub_dags_and_commits
4003 .into_iter()
4004 .map(|(_, commit)| commit)
4005 .collect::<Vec<_>>();
4006
4007 let leaders = core.try_select_certified_leaders(&mut certified_commits, limit);
4008
4009 assert_eq!(leaders.len(), 2);
4011 assert_eq!(certified_commits.len(), 2);
4012 }
4013
4014 pub(crate) async fn receive<T: Copy>(timeout: Duration, mut receiver: watch::Receiver<T>) -> T {
4015 tokio::time::timeout(timeout, receiver.changed())
4016 .await
4017 .expect("Timeout while waiting to read from receiver")
4018 .expect("Signal receive channel shouldn't be closed");
4019 *receiver.borrow_and_update()
4020 }
4021}