1use std::{
5 collections::{BTreeMap, BTreeSet},
6 iter,
7 sync::Arc,
8 time::Duration,
9 vec,
10};
11
12use consensus_config::{AuthorityIndex, ProtocolKeyPair};
13#[cfg(test)]
14use consensus_config::{Stake, local_committee_and_keys};
15use consensus_types::block::{BlockRef, BlockTimestampMs, Round};
16use itertools::Itertools as _;
17#[cfg(test)]
18use mysten_metrics::monitored_mpsc::UnboundedReceiver;
19use mysten_metrics::monitored_scope;
20use parking_lot::RwLock;
21use sui_macros::fail_point;
22use tokio::{
23 sync::{broadcast, watch},
24 time::Instant,
25};
26use tracing::{debug, info, trace, warn};
27
28#[cfg(test)]
29use crate::{
30 CommitConsumerArgs, TransactionClient, block::CertifiedBlocksOutput,
31 block_verifier::NoopBlockVerifier, storage::mem_store::MemStore,
32};
33use crate::{
34 ancestor::{AncestorState, AncestorStateManager},
35 block::{
36 Block, BlockAPI, BlockV1, BlockV2, ExtendedBlock, GENESIS_ROUND, SignedBlock, Slot,
37 VerifiedBlock,
38 },
39 block_manager::BlockManager,
40 commit::{
41 CertifiedCommit, CertifiedCommits, CommitAPI, CommittedSubDag, DecidedLeader, Decision,
42 },
43 commit_observer::CommitObserver,
44 context::Context,
45 dag_state::DagState,
46 error::{ConsensusError, ConsensusResult},
47 leader_schedule::LeaderSchedule,
48 round_tracker::PeerRoundTracker,
49 stake_aggregator::{QuorumThreshold, StakeAggregator},
50 transaction::TransactionConsumer,
51 transaction_certifier::TransactionCertifier,
52 universal_committer::{
53 UniversalCommitter, universal_committer_builder::UniversalCommitterBuilder,
54 },
55};
56
57const MAX_COMMIT_VOTES_PER_BLOCK: usize = 100;
60
61pub(crate) struct Core {
62 context: Arc<Context>,
63 transaction_consumer: TransactionConsumer,
65 transaction_certifier: TransactionCertifier,
67 block_manager: BlockManager,
70 propagation_delay: Round,
77 committer: UniversalCommitter,
79 last_signaled_round: Round,
81 last_included_ancestors: Vec<Option<BlockRef>>,
85 last_decided_leader: Slot,
90 leader_schedule: Arc<LeaderSchedule>,
93 commit_observer: CommitObserver,
96 signals: CoreSignals,
98 block_signer: ProtocolKeyPair,
100 dag_state: Arc<RwLock<DagState>>,
102 last_known_proposed_round: Option<Round>,
106 ancestor_state_manager: AncestorStateManager,
111 round_tracker: Arc<RwLock<PeerRoundTracker>>,
116}
117
118impl Core {
119 pub(crate) fn new(
120 context: Arc<Context>,
121 leader_schedule: Arc<LeaderSchedule>,
122 transaction_consumer: TransactionConsumer,
123 transaction_certifier: TransactionCertifier,
124 block_manager: BlockManager,
125 commit_observer: CommitObserver,
126 signals: CoreSignals,
127 block_signer: ProtocolKeyPair,
128 dag_state: Arc<RwLock<DagState>>,
129 sync_last_known_own_block: bool,
130 round_tracker: Arc<RwLock<PeerRoundTracker>>,
131 ) -> Self {
132 let last_decided_leader = dag_state.read().last_commit_leader();
133 let number_of_leaders = context
134 .protocol_config
135 .mysticeti_num_leaders_per_round()
136 .unwrap_or(1);
137 let committer = UniversalCommitterBuilder::new(
138 context.clone(),
139 leader_schedule.clone(),
140 dag_state.clone(),
141 )
142 .with_number_of_leaders(number_of_leaders)
143 .with_pipeline(true)
144 .build();
145
146 let last_proposed_block = dag_state.read().get_last_proposed_block();
147
148 let last_signaled_round = last_proposed_block.round();
149
150 let mut last_included_ancestors = vec![None; context.committee.size()];
159 for ancestor in last_proposed_block.ancestors() {
160 last_included_ancestors[ancestor.author] = Some(*ancestor);
161 }
162
163 let min_propose_round = if sync_last_known_own_block {
164 None
165 } else {
166 Some(0)
168 };
169
170 let propagation_scores = leader_schedule
171 .leader_swap_table
172 .read()
173 .reputation_scores
174 .clone();
175 let mut ancestor_state_manager =
176 AncestorStateManager::new(context.clone(), dag_state.clone());
177 ancestor_state_manager.set_propagation_scores(propagation_scores);
178
179 Self {
180 context,
181 last_signaled_round,
182 last_included_ancestors,
183 last_decided_leader,
184 leader_schedule,
185 transaction_consumer,
186 transaction_certifier,
187 block_manager,
188 propagation_delay: 0,
189 committer,
190 commit_observer,
191 signals,
192 block_signer,
193 dag_state,
194 last_known_proposed_round: min_propose_round,
195 ancestor_state_manager,
196 round_tracker,
197 }
198 .recover()
199 }
200
201 fn recover(mut self) -> Self {
202 let _s = self
203 .context
204 .metrics
205 .node_metrics
206 .scope_processing_time
207 .with_label_values(&["Core::recover"])
208 .start_timer();
209
210 self.try_commit(vec![]).unwrap();
212
213 let last_proposed_block = if let Some(last_proposed_block) = self.try_propose(true).unwrap()
214 {
215 last_proposed_block
216 } else {
217 let last_proposed_block = self.dag_state.read().get_last_proposed_block();
218
219 if self.should_propose() {
220 assert!(
221 last_proposed_block.round() > GENESIS_ROUND,
222 "At minimum a block of round higher than genesis should have been produced during recovery"
223 );
224 }
225
226 self.signals
228 .new_block(ExtendedBlock {
229 block: last_proposed_block.clone(),
230 excluded_ancestors: vec![],
231 })
232 .unwrap();
233 last_proposed_block
234 };
235
236 self.try_signal_new_round();
240
241 info!(
242 "Core recovery completed with last proposed block {:?}",
243 last_proposed_block
244 );
245
246 self
247 }
248
249 #[tracing::instrument(skip_all)]
253 pub(crate) fn add_blocks(
254 &mut self,
255 blocks: Vec<VerifiedBlock>,
256 ) -> ConsensusResult<BTreeSet<BlockRef>> {
257 let _scope = monitored_scope("Core::add_blocks");
258 let _s = self
259 .context
260 .metrics
261 .node_metrics
262 .scope_processing_time
263 .with_label_values(&["Core::add_blocks"])
264 .start_timer();
265 self.context
266 .metrics
267 .node_metrics
268 .core_add_blocks_batch_size
269 .observe(blocks.len() as f64);
270
271 let (accepted_blocks, missing_block_refs) = self.block_manager.try_accept_blocks(blocks);
272
273 if !accepted_blocks.is_empty() {
274 trace!(
275 "Accepted blocks: {}",
276 accepted_blocks
277 .iter()
278 .map(|b| b.reference().to_string())
279 .join(",")
280 );
281
282 self.try_commit(vec![])?;
284
285 self.try_propose(false)?;
287
288 self.try_signal_new_round();
292 };
293
294 if !missing_block_refs.is_empty() {
295 trace!(
296 "Missing block refs: {}",
297 missing_block_refs.iter().map(|b| b.to_string()).join(", ")
298 );
299 }
300
301 Ok(missing_block_refs)
302 }
303
304 #[tracing::instrument(skip_all)]
309 pub(crate) fn add_certified_commits(
310 &mut self,
311 certified_commits: CertifiedCommits,
312 ) -> ConsensusResult<BTreeSet<BlockRef>> {
313 let _scope = monitored_scope("Core::add_certified_commits");
314
315 let votes = certified_commits.votes().to_vec();
316 let commits = self
317 .filter_new_commits(certified_commits.commits().to_vec())
318 .expect("Certified commits validation failed");
319
320 let (_, missing_block_refs) = self.block_manager.try_accept_blocks(votes);
324
325 self.try_commit(commits)?;
327
328 self.try_propose(false)?;
330
331 self.try_signal_new_round();
335
336 Ok(missing_block_refs)
337 }
338
339 pub(crate) fn check_block_refs(
342 &mut self,
343 block_refs: Vec<BlockRef>,
344 ) -> ConsensusResult<BTreeSet<BlockRef>> {
345 let _scope = monitored_scope("Core::check_block_refs");
346 let _s = self
347 .context
348 .metrics
349 .node_metrics
350 .scope_processing_time
351 .with_label_values(&["Core::check_block_refs"])
352 .start_timer();
353 self.context
354 .metrics
355 .node_metrics
356 .core_check_block_refs_batch_size
357 .observe(block_refs.len() as f64);
358
359 let missing_block_refs = self.block_manager.try_find_blocks(block_refs);
361
362 if !missing_block_refs.is_empty() {
363 trace!(
364 "Missing block refs: {}",
365 missing_block_refs.iter().map(|b| b.to_string()).join(", ")
366 );
367 }
368 Ok(missing_block_refs)
369 }
370
371 fn try_signal_new_round(&mut self) {
373 let new_clock_round = self.dag_state.read().threshold_clock_round();
378 if new_clock_round <= self.last_signaled_round {
379 return;
380 }
381 self.signals.new_round(new_clock_round);
383 self.last_signaled_round = new_clock_round;
384
385 self.context
387 .metrics
388 .node_metrics
389 .threshold_clock_round
390 .set(new_clock_round as i64);
391 }
392
393 pub(crate) fn new_block(
397 &mut self,
398 round: Round,
399 force: bool,
400 ) -> ConsensusResult<Option<VerifiedBlock>> {
401 let _scope = monitored_scope("Core::new_block");
402 if self.last_proposed_round() < round {
403 self.context
404 .metrics
405 .node_metrics
406 .leader_timeout_total
407 .with_label_values(&[&format!("{force}")])
408 .inc();
409 let result = self.try_propose(force);
410 self.try_signal_new_round();
412 return result;
413 }
414 Ok(None)
415 }
416
417 fn filter_new_commits(
420 &mut self,
421 commits: Vec<CertifiedCommit>,
422 ) -> ConsensusResult<Vec<CertifiedCommit>> {
423 let last_commit_index = self.dag_state.read().last_commit_index();
425 let commits = commits
426 .iter()
427 .filter(|commit| {
428 if commit.index() > last_commit_index {
429 true
430 } else {
431 tracing::debug!(
432 "Skip commit for index {} as it is already committed with last commit index {}",
433 commit.index(),
434 last_commit_index
435 );
436 false
437 }
438 })
439 .cloned()
440 .collect::<Vec<_>>();
441
442 if let Some(commit) = commits.first()
444 && commit.index() != last_commit_index + 1
445 {
446 return Err(ConsensusError::UnexpectedCertifiedCommitIndex {
447 expected_commit_index: last_commit_index + 1,
448 commit_index: commit.index(),
449 });
450 }
451
452 Ok(commits)
453 }
454
455 fn try_propose(&mut self, force: bool) -> ConsensusResult<Option<VerifiedBlock>> {
459 if !self.should_propose() {
460 return Ok(None);
461 }
462 if let Some(extended_block) = self.try_new_block(force) {
463 self.signals.new_block(extended_block.clone())?;
464
465 fail_point!("consensus-after-propose");
466
467 self.try_commit(vec![])?;
469 return Ok(Some(extended_block.block));
470 }
471 Ok(None)
472 }
473
474 fn try_new_block(&mut self, force: bool) -> Option<ExtendedBlock> {
477 let _s = self
478 .context
479 .metrics
480 .node_metrics
481 .scope_processing_time
482 .with_label_values(&["Core::try_new_block"])
483 .start_timer();
484
485 let clock_round = {
487 let dag_state = self.dag_state.read();
488 let clock_round = dag_state.threshold_clock_round();
489 if clock_round <= dag_state.get_last_proposed_block().round() {
490 debug!(
491 "Skipping block proposal for round {} as it is not higher than the last proposed block {}",
492 clock_round,
493 dag_state.get_last_proposed_block().round()
494 );
495 return None;
496 }
497 clock_round
498 };
499
500 let quorum_round = clock_round.saturating_sub(1);
502
503 if !force {
506 if !self.leaders_exist(quorum_round) {
507 return None;
508 }
509
510 if Duration::from_millis(
511 self.context
512 .clock
513 .timestamp_utc_ms()
514 .saturating_sub(self.last_proposed_timestamp_ms()),
515 ) < self.context.parameters.min_round_delay
516 {
517 debug!(
518 "Skipping block proposal for round {} as it is too soon after the last proposed block timestamp {}; min round delay is {}ms",
519 clock_round,
520 self.last_proposed_timestamp_ms(),
521 self.context.parameters.min_round_delay.as_millis(),
522 );
523 return None;
524 }
525 }
526
527 let (ancestors, excluded_and_equivocating_ancestors) =
529 self.smart_ancestors_to_propose(clock_round, !force);
530
531 if ancestors.is_empty() {
533 assert!(
534 !force,
535 "Ancestors should have been returned if force is true!"
536 );
537 debug!(
538 "Skipping block proposal for round {} because no good ancestor is found",
539 clock_round,
540 );
541 return None;
542 }
543
544 let excluded_ancestors_limit = self.context.committee.size() * 2;
545 if excluded_and_equivocating_ancestors.len() > excluded_ancestors_limit {
546 debug!(
547 "Dropping {} excluded ancestor(s) during proposal due to size limit",
548 excluded_and_equivocating_ancestors.len() - excluded_ancestors_limit,
549 );
550 }
551 let excluded_ancestors = excluded_and_equivocating_ancestors
552 .into_iter()
553 .take(excluded_ancestors_limit)
554 .collect();
555
556 for ancestor in &ancestors {
558 self.last_included_ancestors[ancestor.author()] = Some(ancestor.reference());
559 }
560
561 let leader_authority = &self
562 .context
563 .committee
564 .authority(self.first_leader(quorum_round))
565 .hostname;
566 self.context
567 .metrics
568 .node_metrics
569 .block_proposal_leader_wait_ms
570 .with_label_values(&[leader_authority])
571 .inc_by(
572 Instant::now()
573 .saturating_duration_since(self.dag_state.read().threshold_clock_quorum_ts())
574 .as_millis() as u64,
575 );
576 self.context
577 .metrics
578 .node_metrics
579 .block_proposal_leader_wait_count
580 .with_label_values(&[leader_authority])
581 .inc();
582
583 self.context
584 .metrics
585 .node_metrics
586 .proposed_block_ancestors
587 .observe(ancestors.len() as f64);
588 for ancestor in &ancestors {
589 let authority = &self.context.committee.authority(ancestor.author()).hostname;
590 self.context
591 .metrics
592 .node_metrics
593 .proposed_block_ancestors_depth
594 .with_label_values(&[authority])
595 .observe(clock_round.saturating_sub(ancestor.round()).into());
596 }
597
598 let now = self.context.clock.timestamp_utc_ms();
599 ancestors.iter().for_each(|block| {
600 if block.timestamp_ms() > now {
601 trace!("Ancestor block {:?} has timestamp {}, greater than current timestamp {now}. Proposing for round {}.", block, block.timestamp_ms(), clock_round);
602 let authority = &self.context.committee.authority(block.author()).hostname;
603 self.context
604 .metrics
605 .node_metrics
606 .proposed_block_ancestors_timestamp_drift_ms
607 .with_label_values(&[authority])
608 .inc_by(block.timestamp_ms().saturating_sub(now));
609 }
610 });
611
612 let (transactions, ack_transactions, _limit_reached) = self.transaction_consumer.next();
615 self.context
616 .metrics
617 .node_metrics
618 .proposed_block_transactions
619 .observe(transactions.len() as f64);
620
621 let commit_votes = self
623 .dag_state
624 .write()
625 .take_commit_votes(MAX_COMMIT_VOTES_PER_BLOCK);
626
627 let transaction_votes = if self.context.protocol_config.mysticeti_fastpath() {
628 let new_causal_history = {
629 let mut dag_state = self.dag_state.write();
630 ancestors
631 .iter()
632 .flat_map(|ancestor| dag_state.link_causal_history(ancestor.reference()))
633 .collect()
634 };
635 self.transaction_certifier.get_own_votes(new_causal_history)
636 } else {
637 vec![]
638 };
639
640 let block = if self.context.protocol_config.mysticeti_fastpath() {
642 Block::V2(BlockV2::new(
643 self.context.committee.epoch(),
644 clock_round,
645 self.context.own_index,
646 now,
647 ancestors.iter().map(|b| b.reference()).collect(),
648 transactions,
649 commit_votes,
650 transaction_votes,
651 vec![],
652 ))
653 } else {
654 Block::V1(BlockV1::new(
655 self.context.committee.epoch(),
656 clock_round,
657 self.context.own_index,
658 now,
659 ancestors.iter().map(|b| b.reference()).collect(),
660 transactions,
661 commit_votes,
662 vec![],
663 ))
664 };
665 let signed_block =
666 SignedBlock::new(block, &self.block_signer).expect("Block signing failed.");
667 let serialized = signed_block
668 .serialize()
669 .expect("Block serialization failed.");
670 self.context
671 .metrics
672 .node_metrics
673 .proposed_block_size
674 .observe(serialized.len() as f64);
675 let verified_block = VerifiedBlock::new_verified(signed_block, serialized);
677
678 let last_proposed_block = self.last_proposed_block();
680 if last_proposed_block.round() > 0 {
681 self.context
682 .metrics
683 .node_metrics
684 .block_proposal_interval
685 .observe(
686 Duration::from_millis(
687 verified_block
688 .timestamp_ms()
689 .saturating_sub(last_proposed_block.timestamp_ms()),
690 )
691 .as_secs_f64(),
692 );
693 }
694
695 let (accepted_blocks, missing) = self
697 .block_manager
698 .try_accept_blocks(vec![verified_block.clone()]);
699 assert_eq!(accepted_blocks.len(), 1);
700 assert!(missing.is_empty());
701
702 if self.context.protocol_config.mysticeti_fastpath() {
706 self.transaction_certifier
707 .add_voted_blocks(vec![(verified_block.clone(), vec![])]);
708 self.dag_state
709 .write()
710 .link_causal_history(verified_block.reference());
711 }
712
713 self.dag_state.write().flush();
715
716 ack_transactions(verified_block.reference());
718
719 info!("Created block {verified_block:?} for round {clock_round}");
720
721 self.context
722 .metrics
723 .node_metrics
724 .proposed_blocks
725 .with_label_values(&[&force.to_string()])
726 .inc();
727
728 let extended_block = ExtendedBlock {
729 block: verified_block,
730 excluded_ancestors,
731 };
732
733 self.round_tracker
735 .write()
736 .update_from_verified_block(&extended_block);
737
738 Some(extended_block)
739 }
740
741 fn try_commit(
744 &mut self,
745 mut certified_commits: Vec<CertifiedCommit>,
746 ) -> ConsensusResult<Vec<CommittedSubDag>> {
747 let _s = self
748 .context
749 .metrics
750 .node_metrics
751 .scope_processing_time
752 .with_label_values(&["Core::try_commit"])
753 .start_timer();
754
755 let mut certified_commits_map = BTreeMap::new();
756 for c in &certified_commits {
757 certified_commits_map.insert(c.index(), c.reference());
758 }
759
760 if !certified_commits.is_empty() {
761 info!(
762 "Processing synced commits: {:?}",
763 certified_commits
764 .iter()
765 .map(|c| (c.index(), c.leader()))
766 .collect::<Vec<_>>()
767 );
768 }
769
770 let mut committed_sub_dags = Vec::new();
771 loop {
773 let mut commits_until_update = self
778 .leader_schedule
779 .commits_until_leader_schedule_update(self.dag_state.clone());
780
781 if commits_until_update == 0 {
782 let last_commit_index = self.dag_state.read().last_commit_index();
783
784 tracing::info!(
785 "Leader schedule change triggered at commit index {last_commit_index}"
786 );
787
788 self.leader_schedule
789 .update_leader_schedule_v2(&self.dag_state);
790
791 let propagation_scores = self
792 .leader_schedule
793 .leader_swap_table
794 .read()
795 .reputation_scores
796 .clone();
797 self.ancestor_state_manager
798 .set_propagation_scores(propagation_scores);
799
800 commits_until_update = self
801 .leader_schedule
802 .commits_until_leader_schedule_update(self.dag_state.clone());
803
804 fail_point!("consensus-after-leader-schedule-change");
805 }
806 assert!(commits_until_update > 0);
807
808 let (certified_leaders, decided_certified_commits): (
811 Vec<DecidedLeader>,
812 Vec<CertifiedCommit>,
813 ) = self
814 .try_select_certified_leaders(&mut certified_commits, commits_until_update)
815 .into_iter()
816 .unzip();
817
818 let blocks = decided_certified_commits
825 .iter()
826 .flat_map(|c| c.blocks())
827 .cloned()
828 .collect::<Vec<_>>();
829 self.block_manager.try_accept_committed_blocks(blocks);
830
831 let (decided_leaders, local) = if certified_leaders.is_empty() {
833 let mut decided_leaders = self.committer.try_decide(self.last_decided_leader);
835 if decided_leaders.len() >= commits_until_update {
837 let _ = decided_leaders.split_off(commits_until_update);
838 }
839 (decided_leaders, true)
840 } else {
841 (certified_leaders, false)
842 };
843
844 let Some(last_decided) = decided_leaders.last().cloned() else {
846 break;
847 };
848
849 self.last_decided_leader = last_decided.slot();
850 self.context
851 .metrics
852 .node_metrics
853 .last_decided_leader_round
854 .set(self.last_decided_leader.round as i64);
855
856 let sequenced_leaders = decided_leaders
857 .into_iter()
858 .filter_map(|leader| leader.into_committed_block())
859 .collect::<Vec<_>>();
860 if sequenced_leaders.is_empty() {
863 break;
864 }
865 tracing::info!(
866 "Committing {} leaders: {}; {} commits before next leader schedule change",
867 sequenced_leaders.len(),
868 sequenced_leaders
869 .iter()
870 .map(|b| b.reference().to_string())
871 .join(","),
872 commits_until_update,
873 );
874
875 let subdags = self
877 .commit_observer
878 .handle_commit(sequenced_leaders, local)?;
879
880 self.block_manager
882 .try_unsuspend_blocks_for_latest_gc_round();
883
884 committed_sub_dags.extend(subdags);
885
886 fail_point!("consensus-after-handle-commit");
887 }
888
889 for sub_dag in &committed_sub_dags {
891 if let Some(commit_ref) = certified_commits_map.remove(&sub_dag.commit_ref.index) {
892 assert_eq!(
893 commit_ref, sub_dag.commit_ref,
894 "Certified commit has different reference than the committed sub dag"
895 );
896 }
897 }
898
899 let committed_block_refs = committed_sub_dags
901 .iter()
902 .flat_map(|sub_dag| sub_dag.blocks.iter())
903 .filter_map(|block| {
904 (block.author() == self.context.own_index).then_some(block.reference())
905 })
906 .collect::<Vec<_>>();
907 self.transaction_consumer
908 .notify_own_blocks_status(committed_block_refs, self.dag_state.read().gc_round());
909
910 Ok(committed_sub_dags)
911 }
912
913 pub(crate) fn get_missing_blocks(&self) -> BTreeSet<BlockRef> {
914 let _scope = monitored_scope("Core::get_missing_blocks");
915 self.block_manager.missing_blocks()
916 }
917
918 pub(crate) fn set_propagation_delay(&mut self, delay: Round) {
920 info!("Propagation round delay set to: {delay}");
921 self.propagation_delay = delay;
922 }
923
924 pub(crate) fn set_last_known_proposed_round(&mut self, round: Round) {
928 if self.last_known_proposed_round.is_some() {
929 panic!(
930 "Should not attempt to set the last known proposed round if that has been already set"
931 );
932 }
933 self.last_known_proposed_round = Some(round);
934 info!("Last known proposed round set to {round}");
935 }
936
937 pub(crate) fn should_propose(&self) -> bool {
939 let clock_round = self.dag_state.read().threshold_clock_round();
940 let core_skipped_proposals = &self.context.metrics.node_metrics.core_skipped_proposals;
941
942 if self.propagation_delay
943 > self
944 .context
945 .parameters
946 .propagation_delay_stop_proposal_threshold
947 {
948 debug!(
949 "Skip proposing for round {clock_round}, high propagation delay {} > {}.",
950 self.propagation_delay,
951 self.context
952 .parameters
953 .propagation_delay_stop_proposal_threshold
954 );
955 core_skipped_proposals
956 .with_label_values(&["high_propagation_delay"])
957 .inc();
958 return false;
959 }
960
961 let Some(last_known_proposed_round) = self.last_known_proposed_round else {
962 debug!(
963 "Skip proposing for round {clock_round}, last known proposed round has not been synced yet."
964 );
965 core_skipped_proposals
966 .with_label_values(&["no_last_known_proposed_round"])
967 .inc();
968 return false;
969 };
970 if clock_round <= last_known_proposed_round {
971 debug!(
972 "Skip proposing for round {clock_round} as last known proposed round is {last_known_proposed_round}"
973 );
974 core_skipped_proposals
975 .with_label_values(&["higher_last_known_proposed_round"])
976 .inc();
977 return false;
978 }
979
980 true
981 }
982
983 #[tracing::instrument(skip_all)]
989 fn try_select_certified_leaders(
990 &mut self,
991 certified_commits: &mut Vec<CertifiedCommit>,
992 limit: usize,
993 ) -> Vec<(DecidedLeader, CertifiedCommit)> {
994 assert!(limit > 0, "limit should be greater than 0");
995 if certified_commits.is_empty() {
996 return vec![];
997 }
998
999 let to_commit = if certified_commits.len() >= limit {
1000 certified_commits.drain(..limit).collect::<Vec<_>>()
1002 } else {
1003 std::mem::take(certified_commits)
1005 };
1006
1007 tracing::debug!(
1008 "Selected {} certified leaders: {}",
1009 to_commit.len(),
1010 to_commit.iter().map(|c| c.leader().to_string()).join(",")
1011 );
1012
1013 to_commit
1014 .into_iter()
1015 .map(|commit| {
1016 let leader = commit.blocks().last().expect("Certified commit should have at least one block");
1017 assert_eq!(leader.reference(), commit.leader(), "Last block of the committed sub dag should have the same digest as the leader of the commit");
1018 let leader = DecidedLeader::Commit(leader.clone(), false);
1020 UniversalCommitter::update_metrics(&self.context, &leader, Decision::Certified);
1021 (leader, commit)
1022 })
1023 .collect::<Vec<_>>()
1024 }
1025
1026 fn smart_ancestors_to_propose(
1030 &mut self,
1031 clock_round: Round,
1032 smart_select: bool,
1033 ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
1034 let node_metrics = &self.context.metrics.node_metrics;
1035 let _s = node_metrics
1036 .scope_processing_time
1037 .with_label_values(&["Core::smart_ancestors_to_propose"])
1038 .start_timer();
1039
1040 let all_ancestors = self
1042 .dag_state
1043 .read()
1044 .get_last_cached_block_per_authority(clock_round);
1045
1046 assert_eq!(
1047 all_ancestors.len(),
1048 self.context.committee.size(),
1049 "Fatal error, number of returned ancestors don't match committee size."
1050 );
1051
1052 let accepted_quorum_rounds = self.round_tracker.read().compute_accepted_quorum_rounds();
1054
1055 self.ancestor_state_manager
1056 .update_all_ancestors_state(&accepted_quorum_rounds);
1057
1058 let ancestor_state_map = self.ancestor_state_manager.get_ancestor_states();
1059
1060 let quorum_round = clock_round.saturating_sub(1);
1061
1062 let mut score_and_pending_excluded_ancestors = Vec::new();
1063 let mut excluded_and_equivocating_ancestors = BTreeSet::new();
1064
1065 let included_ancestors = iter::once(self.last_proposed_block().clone())
1070 .chain(
1071 all_ancestors
1072 .into_iter()
1073 .flat_map(|(ancestor, equivocating_ancestors)| {
1074 if ancestor.author() == self.context.own_index {
1075 return None;
1076 }
1077 if let Some(last_block_ref) =
1078 self.last_included_ancestors[ancestor.author()]
1079 && last_block_ref.round >= ancestor.round() {
1080 return None;
1081 }
1082
1083 excluded_and_equivocating_ancestors.extend(equivocating_ancestors);
1085
1086 let ancestor_state = ancestor_state_map[ancestor.author()];
1087 match ancestor_state {
1088 AncestorState::Include => {
1089 trace!("Found ancestor {ancestor} with INCLUDE state for round {clock_round}");
1090 }
1091 AncestorState::Exclude(score) => {
1092 trace!("Added ancestor {ancestor} with EXCLUDE state with score {score} to temporary excluded ancestors for round {clock_round}");
1093 score_and_pending_excluded_ancestors.push((score, ancestor));
1094 return None;
1095 }
1096 }
1097
1098 Some(ancestor)
1099 }),
1100 )
1101 .collect::<Vec<_>>();
1102
1103 let mut parent_round_quorum = StakeAggregator::<QuorumThreshold>::new();
1104
1105 for ancestor in included_ancestors
1107 .iter()
1108 .filter(|a| a.round() == quorum_round)
1109 {
1110 parent_round_quorum.add(ancestor.author(), &self.context.committee);
1111 }
1112
1113 if smart_select && !parent_round_quorum.reached_threshold(&self.context.committee) {
1114 node_metrics.smart_selection_wait.inc();
1115 debug!(
1116 "Only found {} stake of good ancestors to include for round {clock_round}, will wait for more.",
1117 parent_round_quorum.stake()
1118 );
1119 return (vec![], BTreeSet::new());
1120 }
1121
1122 score_and_pending_excluded_ancestors.sort_by(|a, b| b.0.cmp(&a.0));
1125
1126 let mut ancestors_to_propose = included_ancestors;
1127 let mut excluded_ancestors = Vec::new();
1128 for (score, ancestor) in score_and_pending_excluded_ancestors.into_iter() {
1129 let block_hostname = &self.context.committee.authority(ancestor.author()).hostname;
1130 if !parent_round_quorum.reached_threshold(&self.context.committee)
1131 && ancestor.round() == quorum_round
1132 {
1133 debug!(
1134 "Including temporarily excluded parent round ancestor {ancestor} with score {score} to propose for round {clock_round}"
1135 );
1136 parent_round_quorum.add(ancestor.author(), &self.context.committee);
1137 ancestors_to_propose.push(ancestor);
1138 node_metrics
1139 .included_excluded_proposal_ancestors_count_by_authority
1140 .with_label_values(&[block_hostname, "timeout"])
1141 .inc();
1142 } else {
1143 excluded_ancestors.push((score, ancestor));
1144 }
1145 }
1146
1147 for (score, ancestor) in excluded_ancestors.iter() {
1152 let excluded_author = ancestor.author();
1153 let block_hostname = &self.context.committee.authority(excluded_author).hostname;
1154 let mut accepted_low_quorum_round = accepted_quorum_rounds[excluded_author].0;
1156 accepted_low_quorum_round = accepted_low_quorum_round.min(quorum_round);
1160
1161 let last_included_round = self.last_included_ancestors[excluded_author]
1162 .map(|block_ref| block_ref.round)
1163 .unwrap_or(GENESIS_ROUND);
1164 if ancestor.round() <= last_included_round {
1165 continue;
1168 }
1169
1170 if last_included_round >= accepted_low_quorum_round {
1171 excluded_and_equivocating_ancestors.insert(ancestor.reference());
1172 trace!(
1173 "Excluded low score ancestor {} with score {score} to propose for round {clock_round}: last included round {last_included_round} >= accepted low quorum round {accepted_low_quorum_round}",
1174 ancestor.reference()
1175 );
1176 node_metrics
1177 .excluded_proposal_ancestors_count_by_authority
1178 .with_label_values(&[block_hostname])
1179 .inc();
1180 continue;
1181 }
1182
1183 let ancestor = if ancestor.round() <= accepted_low_quorum_round {
1184 ancestor.clone()
1186 } else {
1187 excluded_and_equivocating_ancestors.insert(ancestor.reference());
1189 trace!(
1190 "Excluded low score ancestor {} with score {score} to propose for round {clock_round}: ancestor round {} > accepted low quorum round {accepted_low_quorum_round} ",
1191 ancestor.reference(),
1192 ancestor.round()
1193 );
1194 node_metrics
1195 .excluded_proposal_ancestors_count_by_authority
1196 .with_label_values(&[block_hostname])
1197 .inc();
1198
1199 match self.dag_state.read().get_last_cached_block_in_range(
1205 excluded_author,
1206 last_included_round + 1,
1207 accepted_low_quorum_round + 1,
1208 ) {
1209 Some(earlier_ancestor) => {
1210 earlier_ancestor
1212 }
1213 None => {
1214 continue;
1216 }
1217 }
1218 };
1219 self.last_included_ancestors[excluded_author] = Some(ancestor.reference());
1220 ancestors_to_propose.push(ancestor.clone());
1221 trace!(
1222 "Included low scoring ancestor {} with score {score} seen at accepted low quorum round {accepted_low_quorum_round} to propose for round {clock_round}",
1223 ancestor.reference()
1224 );
1225 node_metrics
1226 .included_excluded_proposal_ancestors_count_by_authority
1227 .with_label_values(&[block_hostname, "quorum"])
1228 .inc();
1229 }
1230
1231 assert!(
1232 parent_round_quorum.reached_threshold(&self.context.committee),
1233 "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core."
1234 );
1235
1236 debug!(
1237 "Included {} ancestors & excluded {} low performing or equivocating ancestors for proposal in round {clock_round}",
1238 ancestors_to_propose.len(),
1239 excluded_and_equivocating_ancestors.len()
1240 );
1241
1242 (ancestors_to_propose, excluded_and_equivocating_ancestors)
1243 }
1244
1245 fn leaders_exist(&self, round: Round) -> bool {
1249 let dag_state = self.dag_state.read();
1250 for leader in self.leaders(round) {
1251 if !dag_state.contains_cached_block_at_slot(leader) {
1255 return false;
1256 }
1257 }
1258
1259 true
1260 }
1261
1262 fn leaders(&self, round: Round) -> Vec<Slot> {
1264 self.committer
1265 .get_leaders(round)
1266 .into_iter()
1267 .map(|authority_index| Slot::new(round, authority_index))
1268 .collect()
1269 }
1270
1271 fn first_leader(&self, round: Round) -> AuthorityIndex {
1273 self.leaders(round).first().unwrap().authority
1274 }
1275
1276 fn last_proposed_timestamp_ms(&self) -> BlockTimestampMs {
1277 self.last_proposed_block().timestamp_ms()
1278 }
1279
1280 fn last_proposed_round(&self) -> Round {
1281 self.last_proposed_block().round()
1282 }
1283
1284 fn last_proposed_block(&self) -> VerifiedBlock {
1285 self.dag_state.read().get_last_proposed_block()
1286 }
1287}
1288
1289pub(crate) struct CoreSignals {
1291 tx_block_broadcast: broadcast::Sender<ExtendedBlock>,
1292 new_round_sender: watch::Sender<Round>,
1293 context: Arc<Context>,
1294}
1295
1296impl CoreSignals {
1297 pub fn new(context: Arc<Context>) -> (Self, CoreSignalsReceivers) {
1298 let (tx_block_broadcast, rx_block_broadcast) = broadcast::channel::<ExtendedBlock>(
1302 context.parameters.dag_state_cached_rounds as usize,
1303 );
1304 let (new_round_sender, new_round_receiver) = watch::channel(0);
1305
1306 let me = Self {
1307 tx_block_broadcast,
1308 new_round_sender,
1309 context,
1310 };
1311
1312 let receivers = CoreSignalsReceivers {
1313 rx_block_broadcast,
1314 new_round_receiver,
1315 };
1316
1317 (me, receivers)
1318 }
1319
1320 pub(crate) fn new_block(&self, extended_block: ExtendedBlock) -> ConsensusResult<()> {
1323 if self.context.committee.size() > 1 {
1326 if extended_block.block.round() == GENESIS_ROUND {
1327 debug!("Ignoring broadcasting genesis block to peers");
1328 return Ok(());
1329 }
1330
1331 if let Err(err) = self.tx_block_broadcast.send(extended_block) {
1332 warn!("Couldn't broadcast the block to any receiver: {err}");
1333 return Err(ConsensusError::Shutdown);
1334 }
1335 } else {
1336 debug!(
1337 "Did not broadcast block {extended_block:?} to receivers as committee size is <= 1"
1338 );
1339 }
1340 Ok(())
1341 }
1342
1343 pub(crate) fn new_round(&mut self, round_number: Round) {
1346 let _ = self.new_round_sender.send_replace(round_number);
1347 }
1348}
1349
1350pub(crate) struct CoreSignalsReceivers {
1353 rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
1354 new_round_receiver: watch::Receiver<Round>,
1355}
1356
1357impl CoreSignalsReceivers {
1358 pub(crate) fn block_broadcast_receiver(&self) -> broadcast::Receiver<ExtendedBlock> {
1359 self.rx_block_broadcast.resubscribe()
1360 }
1361
1362 pub(crate) fn new_round_receiver(&self) -> watch::Receiver<Round> {
1363 self.new_round_receiver.clone()
1364 }
1365}
1366
1367#[cfg(test)]
1370pub(crate) async fn create_cores(
1371 context: Context,
1372 authorities: Vec<Stake>,
1373) -> Vec<CoreTextFixture> {
1374 let mut cores = Vec::new();
1375
1376 for index in 0..authorities.len() {
1377 let own_index = AuthorityIndex::new_for_test(index as u32);
1378 let core =
1379 CoreTextFixture::new(context.clone(), authorities.clone(), own_index, false).await;
1380 cores.push(core);
1381 }
1382 cores
1383}
1384
1385#[cfg(test)]
1386pub(crate) struct CoreTextFixture {
1387 pub(crate) core: Core,
1388 pub(crate) transaction_certifier: TransactionCertifier,
1389 pub(crate) signal_receivers: CoreSignalsReceivers,
1390 pub(crate) block_receiver: broadcast::Receiver<ExtendedBlock>,
1391 pub(crate) _commit_output_receiver: UnboundedReceiver<CommittedSubDag>,
1392 pub(crate) _blocks_output_receiver: UnboundedReceiver<CertifiedBlocksOutput>,
1393 pub(crate) dag_state: Arc<RwLock<DagState>>,
1394 pub(crate) store: Arc<MemStore>,
1395}
1396
1397#[cfg(test)]
1398impl CoreTextFixture {
1399 async fn new(
1400 context: Context,
1401 authorities: Vec<Stake>,
1402 own_index: AuthorityIndex,
1403 sync_last_known_own_block: bool,
1404 ) -> Self {
1405 let (committee, mut signers) = local_committee_and_keys(0, authorities.clone());
1406 let mut context = context.clone();
1407 context = context
1408 .with_committee(committee)
1409 .with_authority_index(own_index);
1410 context
1411 .protocol_config
1412 .set_consensus_bad_nodes_stake_threshold_for_testing(33);
1413
1414 let context = Arc::new(context);
1415 let store = Arc::new(MemStore::new());
1416 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1417
1418 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
1419 let leader_schedule = Arc::new(
1420 LeaderSchedule::from_store(context.clone(), dag_state.clone())
1421 .with_num_commits_per_schedule(10),
1422 );
1423 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1424 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1425 let (blocks_sender, _blocks_receiver) =
1426 mysten_metrics::monitored_mpsc::unbounded_channel("consensus_block_output");
1427 let transaction_certifier = TransactionCertifier::new(
1428 context.clone(),
1429 Arc::new(NoopBlockVerifier {}),
1430 dag_state.clone(),
1431 blocks_sender,
1432 );
1433 let (signals, signal_receivers) = CoreSignals::new(context.clone());
1434 let block_receiver = signal_receivers.block_broadcast_receiver();
1436
1437 let (commit_consumer, commit_output_receiver, blocks_output_receiver) =
1438 CommitConsumerArgs::new(0, 0);
1439 let commit_observer = CommitObserver::new(
1440 context.clone(),
1441 commit_consumer,
1442 dag_state.clone(),
1443 transaction_certifier.clone(),
1444 leader_schedule.clone(),
1445 )
1446 .await;
1447
1448 let block_signer = signers.remove(own_index.value()).1;
1449
1450 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
1451 let core = Core::new(
1452 context,
1453 leader_schedule,
1454 transaction_consumer,
1455 transaction_certifier.clone(),
1456 block_manager,
1457 commit_observer,
1458 signals,
1459 block_signer,
1460 dag_state.clone(),
1461 sync_last_known_own_block,
1462 round_tracker,
1463 );
1464
1465 Self {
1466 core,
1467 transaction_certifier,
1468 signal_receivers,
1469 block_receiver,
1470 _commit_output_receiver: commit_output_receiver,
1471 _blocks_output_receiver: blocks_output_receiver,
1472 dag_state,
1473 store,
1474 }
1475 }
1476
1477 pub(crate) fn add_blocks(
1478 &mut self,
1479 blocks: Vec<VerifiedBlock>,
1480 ) -> ConsensusResult<BTreeSet<BlockRef>> {
1481 self.transaction_certifier
1482 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
1483 self.core.add_blocks(blocks)
1484 }
1485}
1486
1487#[cfg(test)]
1488mod test {
1489 use std::{collections::BTreeSet, time::Duration};
1490
1491 use consensus_config::{AuthorityIndex, Parameters};
1492 use consensus_types::block::TransactionIndex;
1493 use futures::{StreamExt, stream::FuturesUnordered};
1494 use mysten_metrics::monitored_mpsc;
1495 use sui_protocol_config::ProtocolConfig;
1496 use tokio::time::sleep;
1497
1498 use super::*;
1499 use crate::{
1500 CommitConsumerArgs, CommitIndex,
1501 block::{TestBlock, genesis_blocks},
1502 block_verifier::NoopBlockVerifier,
1503 commit::CommitAPI,
1504 leader_scoring::ReputationScores,
1505 storage::{Store, WriteBatch, mem_store::MemStore},
1506 test_dag_builder::DagBuilder,
1507 test_dag_parser::parse_dag,
1508 transaction::{BlockStatus, TransactionClient},
1509 };
1510
1511 #[tokio::test]
1513 async fn test_core_recover_from_store_for_full_round() {
1514 telemetry_subscribers::init_for_testing();
1515 let (context, mut key_pairs) = Context::new_for_test(4);
1516 let context = Arc::new(context);
1517 let store = Arc::new(MemStore::new());
1518 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1519 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1520 let mut block_status_subscriptions = FuturesUnordered::new();
1521
1522 let mut last_round_blocks = genesis_blocks(&context);
1524 let mut all_blocks: Vec<VerifiedBlock> = last_round_blocks.clone();
1525 for round in 1..=4 {
1526 let mut this_round_blocks = Vec::new();
1527 for (index, _authority) in context.committee.authorities() {
1528 let block = VerifiedBlock::new_for_test(
1529 TestBlock::new(round, index.value() as u32)
1530 .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
1531 .build(),
1532 );
1533
1534 if round == 1 && index == context.own_index {
1536 let subscription =
1537 transaction_consumer.subscribe_for_block_status_testing(block.reference());
1538 block_status_subscriptions.push(subscription);
1539 }
1540
1541 this_round_blocks.push(block);
1542 }
1543 all_blocks.extend(this_round_blocks.clone());
1544 last_round_blocks = this_round_blocks;
1545 }
1546 store
1548 .write(WriteBatch::default().blocks(all_blocks))
1549 .expect("Storage error");
1550
1551 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1553 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
1554 let leader_schedule = Arc::new(LeaderSchedule::from_store(
1555 context.clone(),
1556 dag_state.clone(),
1557 ));
1558 let (blocks_sender, _blocks_receiver) =
1559 monitored_mpsc::unbounded_channel("consensus_block_output");
1560 let transaction_certifier = TransactionCertifier::new(
1561 context.clone(),
1562 Arc::new(NoopBlockVerifier {}),
1563 dag_state.clone(),
1564 blocks_sender,
1565 );
1566
1567 let (commit_consumer, _commit_receiver, _transaction_receiver) =
1568 CommitConsumerArgs::new(0, 0);
1569 let commit_observer = CommitObserver::new(
1570 context.clone(),
1571 commit_consumer,
1572 dag_state.clone(),
1573 transaction_certifier.clone(),
1574 leader_schedule.clone(),
1575 )
1576 .await;
1577
1578 let last_commit = store.read_last_commit().unwrap();
1580 assert!(last_commit.is_none());
1581 assert_eq!(dag_state.read().last_commit_index(), 0);
1582
1583 let (signals, signal_receivers) = CoreSignals::new(context.clone());
1585 let (blocks_sender, _blocks_receiver) =
1586 monitored_mpsc::unbounded_channel("consensus_block_output");
1587 let transaction_certifier = TransactionCertifier::new(
1588 context.clone(),
1589 Arc::new(NoopBlockVerifier {}),
1590 dag_state.clone(),
1591 blocks_sender,
1592 );
1593 transaction_certifier.recover_blocks_after_round(dag_state.read().gc_round());
1594 let mut block_receiver = signal_receivers.block_broadcast_receiver();
1596 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
1597 let _core = Core::new(
1598 context.clone(),
1599 leader_schedule,
1600 transaction_consumer,
1601 transaction_certifier.clone(),
1602 block_manager,
1603 commit_observer,
1604 signals,
1605 key_pairs.remove(context.own_index.value()).1,
1606 dag_state.clone(),
1607 false,
1608 round_tracker,
1609 );
1610
1611 let mut new_round = signal_receivers.new_round_receiver();
1613 assert_eq!(*new_round.borrow_and_update(), 5);
1614
1615 let proposed_block = block_receiver
1617 .recv()
1618 .await
1619 .expect("A block should have been created");
1620 assert_eq!(proposed_block.block.round(), 5);
1621 let ancestors = proposed_block.block.ancestors();
1622
1623 assert_eq!(ancestors.len(), 4);
1625 for ancestor in ancestors {
1626 assert_eq!(ancestor.round, 4);
1627 }
1628
1629 dag_state.write().flush();
1631
1632 let last_commit = store
1636 .read_last_commit()
1637 .unwrap()
1638 .expect("last commit should be set");
1639 assert_eq!(last_commit.index(), 2);
1640 assert_eq!(dag_state.read().last_commit_index(), 2);
1641 let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1642 assert_eq!(all_stored_commits.len(), 2);
1643
1644 while let Some(result) = block_status_subscriptions.next().await {
1646 let status = result.unwrap();
1647 assert!(matches!(status, BlockStatus::Sequenced(_)));
1648 }
1649 }
1650
1651 #[tokio::test]
1654 async fn test_core_recover_from_store_for_partial_round() {
1655 telemetry_subscribers::init_for_testing();
1656
1657 let (context, mut key_pairs) = Context::new_for_test(4);
1658 let context = Arc::new(context);
1659 let store = Arc::new(MemStore::new());
1660 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1661 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1662
1663 let mut last_round_blocks = genesis_blocks(&context);
1665 let mut all_blocks = last_round_blocks.clone();
1666 for round in 1..=4 {
1667 let mut this_round_blocks = Vec::new();
1668
1669 let authorities_to_skip = if round == 4 {
1671 context.committee.validity_threshold() as usize
1672 } else {
1673 1
1675 };
1676
1677 for (index, _authority) in context.committee.authorities().skip(authorities_to_skip) {
1678 let block = TestBlock::new(round, index.value() as u32)
1679 .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
1680 .build();
1681 this_round_blocks.push(VerifiedBlock::new_for_test(block));
1682 }
1683 all_blocks.extend(this_round_blocks.clone());
1684 last_round_blocks = this_round_blocks;
1685 }
1686
1687 store
1689 .write(WriteBatch::default().blocks(all_blocks))
1690 .expect("Storage error");
1691
1692 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1694 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
1695 let leader_schedule = Arc::new(LeaderSchedule::from_store(
1696 context.clone(),
1697 dag_state.clone(),
1698 ));
1699 let (blocks_sender, _blocks_receiver) =
1700 monitored_mpsc::unbounded_channel("consensus_block_output");
1701 let transaction_certifier = TransactionCertifier::new(
1702 context.clone(),
1703 Arc::new(NoopBlockVerifier {}),
1704 dag_state.clone(),
1705 blocks_sender,
1706 );
1707
1708 let (commit_consumer, _commit_receiver, _transaction_receiver) =
1709 CommitConsumerArgs::new(0, 0);
1710 let commit_observer = CommitObserver::new(
1711 context.clone(),
1712 commit_consumer,
1713 dag_state.clone(),
1714 transaction_certifier.clone(),
1715 leader_schedule.clone(),
1716 )
1717 .await;
1718
1719 let last_commit = store.read_last_commit().unwrap();
1721 assert!(last_commit.is_none());
1722 assert_eq!(dag_state.read().last_commit_index(), 0);
1723
1724 let (signals, signal_receivers) = CoreSignals::new(context.clone());
1726 let (blocks_sender, _blocks_receiver) =
1727 monitored_mpsc::unbounded_channel("consensus_block_output");
1728 let transaction_certifier = TransactionCertifier::new(
1729 context.clone(),
1730 Arc::new(NoopBlockVerifier {}),
1731 dag_state.clone(),
1732 blocks_sender,
1733 );
1734 transaction_certifier.recover_blocks_after_round(dag_state.read().gc_round());
1735 let mut block_receiver = signal_receivers.block_broadcast_receiver();
1737 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
1738 let mut core = Core::new(
1739 context.clone(),
1740 leader_schedule,
1741 transaction_consumer,
1742 transaction_certifier,
1743 block_manager,
1744 commit_observer,
1745 signals,
1746 key_pairs.remove(context.own_index.value()).1,
1747 dag_state.clone(),
1748 false,
1749 round_tracker,
1750 );
1751
1752 let mut new_round = signal_receivers.new_round_receiver();
1755 assert_eq!(*new_round.borrow_and_update(), 5);
1756
1757 let proposed_block = block_receiver
1759 .recv()
1760 .await
1761 .expect("A block should have been created");
1762 assert_eq!(proposed_block.block.round(), 4);
1763 let ancestors = proposed_block.block.ancestors();
1764
1765 assert_eq!(ancestors.len(), 4);
1766 for ancestor in ancestors {
1767 if ancestor.author == context.own_index {
1768 assert_eq!(ancestor.round, 0);
1769 } else {
1770 assert_eq!(ancestor.round, 3);
1771 }
1772 }
1773
1774 core.try_commit(vec![]).ok();
1776
1777 core.dag_state.write().flush();
1779
1780 let last_commit = store
1784 .read_last_commit()
1785 .unwrap()
1786 .expect("last commit should be set");
1787 assert_eq!(last_commit.index(), 2);
1788 assert_eq!(dag_state.read().last_commit_index(), 2);
1789 let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1790 assert_eq!(all_stored_commits.len(), 2);
1791 }
1792
1793 #[tokio::test]
1794 async fn test_core_propose_after_genesis() {
1795 telemetry_subscribers::init_for_testing();
1796 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1797 config.set_consensus_max_transaction_size_bytes_for_testing(2_000);
1798 config.set_consensus_max_transactions_in_block_bytes_for_testing(2_000);
1799 config
1800 });
1801
1802 let (context, mut key_pairs) = Context::new_for_test(4);
1803 let context = Arc::new(context);
1804 let store = Arc::new(MemStore::new());
1805 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1806
1807 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
1808 let (transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1809 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1810 let (blocks_sender, _blocks_receiver) =
1811 monitored_mpsc::unbounded_channel("consensus_block_output");
1812 let transaction_certifier = TransactionCertifier::new(
1813 context.clone(),
1814 Arc::new(NoopBlockVerifier {}),
1815 dag_state.clone(),
1816 blocks_sender,
1817 );
1818 let (signals, signal_receivers) = CoreSignals::new(context.clone());
1819 let mut block_receiver = signal_receivers.block_broadcast_receiver();
1821 let leader_schedule = Arc::new(LeaderSchedule::from_store(
1822 context.clone(),
1823 dag_state.clone(),
1824 ));
1825
1826 let (commit_consumer, _commit_receiver, _transaction_receiver) =
1827 CommitConsumerArgs::new(0, 0);
1828 let commit_observer = CommitObserver::new(
1829 context.clone(),
1830 commit_consumer,
1831 dag_state.clone(),
1832 transaction_certifier.clone(),
1833 leader_schedule.clone(),
1834 )
1835 .await;
1836
1837 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
1838 let mut core = Core::new(
1839 context.clone(),
1840 leader_schedule,
1841 transaction_consumer,
1842 transaction_certifier,
1843 block_manager,
1844 commit_observer,
1845 signals,
1846 key_pairs.remove(context.own_index.value()).1,
1847 dag_state.clone(),
1848 false,
1849 round_tracker,
1850 );
1851
1852 let mut total = 0;
1854 let mut index = 0;
1855 loop {
1856 let transaction =
1857 bcs::to_bytes(&format!("Transaction {index}")).expect("Shouldn't fail");
1858 total += transaction.len();
1859 index += 1;
1860 let _w = transaction_client
1861 .submit_no_wait(vec![transaction])
1862 .await
1863 .unwrap();
1864
1865 if total >= 1_000 {
1867 break;
1868 }
1869 }
1870
1871 let extended_block = block_receiver
1873 .recv()
1874 .await
1875 .expect("A new block should have been created");
1876
1877 assert_eq!(extended_block.block.round(), 1);
1879 assert_eq!(extended_block.block.author().value(), 0);
1880 assert_eq!(extended_block.block.ancestors().len(), 4);
1881
1882 let mut total = 0;
1883 for (i, transaction) in extended_block.block.transactions().iter().enumerate() {
1884 total += transaction.data().len() as u64;
1885 let transaction: String = bcs::from_bytes(transaction.data()).unwrap();
1886 assert_eq!(format!("Transaction {i}"), transaction);
1887 }
1888 assert!(total <= context.protocol_config.max_transactions_in_block_bytes());
1889
1890 let all_genesis = genesis_blocks(&context);
1892
1893 for ancestor in extended_block.block.ancestors() {
1894 all_genesis
1895 .iter()
1896 .find(|block| block.reference() == *ancestor)
1897 .expect("Block should be found amongst genesis blocks");
1898 }
1899
1900 assert!(core.try_propose(false).unwrap().is_none());
1902 assert!(core.try_propose(true).unwrap().is_none());
1903
1904 dag_state.write().flush();
1906
1907 let last_commit = store.read_last_commit().unwrap();
1909 assert!(last_commit.is_none());
1910 assert_eq!(dag_state.read().last_commit_index(), 0);
1911 }
1912
1913 #[tokio::test]
1914 async fn test_core_propose_once_receiving_a_quorum() {
1915 telemetry_subscribers::init_for_testing();
1916 let (context, _key_pairs) = Context::new_for_test(4);
1917 let mut core_fixture = CoreTextFixture::new(
1918 context.clone(),
1919 vec![1, 1, 1, 1],
1920 AuthorityIndex::new_for_test(0),
1921 false,
1922 )
1923 .await;
1924 let transaction_certifier = &core_fixture.transaction_certifier;
1925 let store = &core_fixture.store;
1926 let dag_state = &core_fixture.dag_state;
1927 let core = &mut core_fixture.core;
1928
1929 let mut expected_ancestors = BTreeSet::new();
1930
1931 let block_1 = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
1933 expected_ancestors.insert(block_1.reference());
1934 sleep(context.parameters.min_round_delay).await;
1936 transaction_certifier.add_voted_blocks(vec![(block_1.clone(), vec![])]);
1938 _ = core.add_blocks(vec![block_1]);
1939
1940 assert_eq!(core.last_proposed_round(), 1);
1941 expected_ancestors.insert(core.last_proposed_block().reference());
1942 assert!(core.try_propose(false).unwrap().is_none());
1944
1945 let block_2 = VerifiedBlock::new_for_test(TestBlock::new(1, 2).build());
1947 expected_ancestors.insert(block_2.reference());
1948 sleep(context.parameters.min_round_delay).await;
1950 transaction_certifier.add_voted_blocks(vec![(block_2.clone(), vec![1, 4])]);
1952 _ = core.add_blocks(vec![block_2.clone()]);
1953
1954 assert_eq!(core.last_proposed_round(), 2);
1955
1956 let proposed_block = core.last_proposed_block();
1957 assert_eq!(proposed_block.round(), 2);
1958 assert_eq!(proposed_block.author(), context.own_index);
1959 assert_eq!(proposed_block.ancestors().len(), 3);
1960 let ancestors = proposed_block.ancestors();
1961 let ancestors = ancestors.iter().cloned().collect::<BTreeSet<_>>();
1962 assert_eq!(ancestors, expected_ancestors);
1963
1964 let transaction_votes = proposed_block.transaction_votes();
1965 assert_eq!(transaction_votes.len(), 1);
1966 let transaction_vote = transaction_votes.first().unwrap();
1967 assert_eq!(transaction_vote.block_ref, block_2.reference());
1968 assert_eq!(transaction_vote.rejects, vec![1, 4]);
1969
1970 dag_state.write().flush();
1972
1973 let last_commit = store.read_last_commit().unwrap();
1975 assert!(last_commit.is_none());
1976 assert_eq!(dag_state.read().last_commit_index(), 0);
1977 }
1978
1979 #[tokio::test]
1980 async fn test_commit_and_notify_for_block_status() {
1981 telemetry_subscribers::init_for_testing();
1982 let (mut context, mut key_pairs) = Context::new_for_test(4);
1983 const GC_DEPTH: u32 = 2;
1984
1985 context
1986 .protocol_config
1987 .set_consensus_gc_depth_for_testing(GC_DEPTH);
1988
1989 let context = Arc::new(context);
1990
1991 let store = Arc::new(MemStore::new());
1992 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1993 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1994 let mut block_status_subscriptions = FuturesUnordered::new();
1995
1996 let dag_str = "DAG {
1997 Round 0 : { 4 },
1998 Round 1 : { * },
1999 Round 2 : { * },
2000 Round 3 : {
2001 A -> [*],
2002 B -> [-A2],
2003 C -> [-A2],
2004 D -> [-A2],
2005 },
2006 Round 4 : {
2007 B -> [-A3],
2008 C -> [-A3],
2009 D -> [-A3],
2010 },
2011 Round 5 : {
2012 A -> [A3, B4, C4, D4]
2013 B -> [*],
2014 C -> [*],
2015 D -> [*],
2016 },
2017 Round 6 : { * },
2018 Round 7 : { * },
2019 Round 8 : { * },
2020 }";
2021
2022 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2023 dag_builder.print();
2024
2025 for block in dag_builder.blocks(1..=5) {
2027 if block.author() == context.own_index {
2028 let subscription =
2029 transaction_consumer.subscribe_for_block_status_testing(block.reference());
2030 block_status_subscriptions.push(subscription);
2031 }
2032 }
2033
2034 store
2036 .write(WriteBatch::default().blocks(dag_builder.blocks(1..=8)))
2037 .expect("Storage error");
2038
2039 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2041 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2042 let leader_schedule = Arc::new(LeaderSchedule::from_store(
2043 context.clone(),
2044 dag_state.clone(),
2045 ));
2046 let (blocks_sender, _blocks_receiver) =
2047 monitored_mpsc::unbounded_channel("consensus_block_output");
2048 let transaction_certifier = TransactionCertifier::new(
2049 context.clone(),
2050 Arc::new(NoopBlockVerifier {}),
2051 dag_state.clone(),
2052 blocks_sender,
2053 );
2054
2055 let (commit_consumer, _commit_receiver, _transaction_receiver) =
2056 CommitConsumerArgs::new(0, 0);
2057 let commit_observer = CommitObserver::new(
2058 context.clone(),
2059 commit_consumer,
2060 dag_state.clone(),
2061 transaction_certifier.clone(),
2062 leader_schedule.clone(),
2063 )
2064 .await;
2065
2066 dag_state.write().flush();
2068
2069 let last_commit = store.read_last_commit().unwrap();
2071 assert!(last_commit.is_none());
2072 assert_eq!(dag_state.read().last_commit_index(), 0);
2073
2074 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2076 let (blocks_sender, _blocks_receiver) =
2077 monitored_mpsc::unbounded_channel("consensus_block_output");
2078 let transaction_certifier = TransactionCertifier::new(
2079 context.clone(),
2080 Arc::new(NoopBlockVerifier {}),
2081 dag_state.clone(),
2082 blocks_sender,
2083 );
2084 transaction_certifier.recover_blocks_after_round(dag_state.read().gc_round());
2085 let _block_receiver = signal_receivers.block_broadcast_receiver();
2087 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
2088 let _core = Core::new(
2089 context.clone(),
2090 leader_schedule,
2091 transaction_consumer,
2092 transaction_certifier,
2093 block_manager,
2094 commit_observer,
2095 signals,
2096 key_pairs.remove(context.own_index.value()).1,
2097 dag_state.clone(),
2098 false,
2099 round_tracker,
2100 );
2101
2102 dag_state.write().flush();
2104
2105 let last_commit = store
2106 .read_last_commit()
2107 .unwrap()
2108 .expect("last commit should be set");
2109
2110 assert_eq!(last_commit.index(), 5);
2111
2112 while let Some(result) = block_status_subscriptions.next().await {
2113 let status = result.unwrap();
2114
2115 match status {
2116 BlockStatus::Sequenced(block_ref) => {
2117 assert!(block_ref.round == 1 || block_ref.round == 5);
2118 }
2119 BlockStatus::GarbageCollected(block_ref) => {
2120 assert!(block_ref.round == 2 || block_ref.round == 3);
2121 }
2122 }
2123 }
2124 }
2125
2126 #[tokio::test]
2129 async fn test_multiple_commits_advance_threshold_clock() {
2130 telemetry_subscribers::init_for_testing();
2131 let (mut context, mut key_pairs) = Context::new_for_test(4);
2132 const GC_DEPTH: u32 = 2;
2133
2134 context
2135 .protocol_config
2136 .set_consensus_gc_depth_for_testing(GC_DEPTH);
2137
2138 let context = Arc::new(context);
2139
2140 let store = Arc::new(MemStore::new());
2141 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2142 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2143
2144 let dag_str = "DAG {
2148 Round 0 : { 4 },
2149 Round 1 : { * },
2150 Round 2 : {
2151 B -> [-D1],
2152 C -> [-D1],
2153 D -> [-D1],
2154 },
2155 Round 3 : {
2156 B -> [*],
2157 C -> [*]
2158 D -> [*],
2159 },
2160 Round 4 : {
2161 A -> [*],
2162 B -> [*],
2163 C -> [*]
2164 D -> [*],
2165 },
2166 Round 5 : {
2167 A -> [*],
2168 B -> [*],
2169 C -> [*],
2170 D -> [*],
2171 },
2172 Round 6 : {
2173 B -> [A5, B5, C5, D1],
2174 C -> [A5, B5, C5, D1],
2175 D -> [A5, B5, C5, D1],
2176 },
2177 Round 7 : {
2178 B -> [*],
2179 C -> [*],
2180 D -> [*],
2181 },
2182 Round 8 : {
2183 B -> [*],
2184 C -> [*],
2185 D -> [*],
2186 },
2187 Round 9 : {
2188 B -> [*],
2189 C -> [*],
2190 D -> [*],
2191 },
2192 Round 10 : {
2193 B -> [*],
2194 C -> [*],
2195 D -> [*],
2196 },
2197 Round 11 : {
2198 B -> [*],
2199 C -> [*],
2200 D -> [*],
2201 },
2202 }";
2203
2204 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2205 dag_builder.print();
2206
2207 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2209 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2210 let leader_schedule = Arc::new(LeaderSchedule::from_store(
2211 context.clone(),
2212 dag_state.clone(),
2213 ));
2214 let (blocks_sender, _blocks_receiver) =
2215 monitored_mpsc::unbounded_channel("consensus_block_output");
2216 let transaction_certifier = TransactionCertifier::new(
2217 context.clone(),
2218 Arc::new(NoopBlockVerifier {}),
2219 dag_state.clone(),
2220 blocks_sender,
2221 );
2222
2223 let (commit_consumer, _commit_receiver, _transaction_receiver) =
2224 CommitConsumerArgs::new(0, 0);
2225 let commit_observer = CommitObserver::new(
2226 context.clone(),
2227 commit_consumer,
2228 dag_state.clone(),
2229 transaction_certifier.clone(),
2230 leader_schedule.clone(),
2231 )
2232 .await;
2233
2234 dag_state.write().flush();
2236
2237 let last_commit = store.read_last_commit().unwrap();
2239 assert!(last_commit.is_none());
2240 assert_eq!(dag_state.read().last_commit_index(), 0);
2241
2242 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2244 let (blocks_sender, _blocks_receiver) =
2245 monitored_mpsc::unbounded_channel("consensus_block_output");
2246 let transaction_certifier = TransactionCertifier::new(
2247 context.clone(),
2248 Arc::new(NoopBlockVerifier {}),
2249 dag_state.clone(),
2250 blocks_sender,
2251 );
2252 let _block_receiver = signal_receivers.block_broadcast_receiver();
2254 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
2255 let mut core = Core::new(
2256 context.clone(),
2257 leader_schedule,
2258 transaction_consumer,
2259 transaction_certifier.clone(),
2260 block_manager,
2261 commit_observer,
2262 signals,
2263 key_pairs.remove(context.own_index.value()).1,
2264 dag_state.clone(),
2265 true,
2266 round_tracker,
2267 );
2268 core.set_last_known_proposed_round(4);
2271
2272 let mut all_blocks = dag_builder.blocks(1..=11);
2278 all_blocks.sort_by_key(|b| b.round());
2279 let voted_blocks: Vec<(VerifiedBlock, Vec<TransactionIndex>)> =
2280 all_blocks.iter().map(|b| (b.clone(), vec![])).collect();
2281 transaction_certifier.add_voted_blocks(voted_blocks);
2282 let blocks: Vec<VerifiedBlock> = all_blocks
2283 .into_iter()
2284 .filter(|b| !(b.round() == 1 && b.author() == AuthorityIndex::new_for_test(3)))
2285 .collect();
2286 core.add_blocks(blocks).expect("Should not fail");
2287
2288 assert_eq!(core.last_proposed_round(), 12);
2289 }
2290
2291 #[tokio::test]
2292 async fn test_core_set_min_propose_round() {
2293 telemetry_subscribers::init_for_testing();
2294 let (context, mut key_pairs) = Context::new_for_test(4);
2295 let context = Arc::new(context.with_parameters(Parameters {
2296 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2297 ..Default::default()
2298 }));
2299
2300 let store = Arc::new(MemStore::new());
2301 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2302
2303 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2304 let leader_schedule = Arc::new(LeaderSchedule::from_store(
2305 context.clone(),
2306 dag_state.clone(),
2307 ));
2308
2309 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2310 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2311 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2312 let (blocks_sender, _blocks_receiver) =
2313 monitored_mpsc::unbounded_channel("consensus_block_output");
2314 let transaction_certifier = TransactionCertifier::new(
2315 context.clone(),
2316 Arc::new(NoopBlockVerifier {}),
2317 dag_state.clone(),
2318 blocks_sender,
2319 );
2320 let _block_receiver = signal_receivers.block_broadcast_receiver();
2322
2323 let (commit_consumer, _commit_receiver, _transaction_receiver) =
2324 CommitConsumerArgs::new(0, 0);
2325 let commit_observer = CommitObserver::new(
2326 context.clone(),
2327 commit_consumer,
2328 dag_state.clone(),
2329 transaction_certifier.clone(),
2330 leader_schedule.clone(),
2331 )
2332 .await;
2333
2334 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
2335 let mut core = Core::new(
2336 context.clone(),
2337 leader_schedule,
2338 transaction_consumer,
2339 transaction_certifier.clone(),
2340 block_manager,
2341 commit_observer,
2342 signals,
2343 key_pairs.remove(context.own_index.value()).1,
2344 dag_state.clone(),
2345 true,
2346 round_tracker,
2347 );
2348
2349 assert_eq!(
2351 core.last_proposed_round(),
2352 GENESIS_ROUND,
2353 "No block should have been created other than genesis"
2354 );
2355
2356 assert!(core.try_propose(true).unwrap().is_none());
2358
2359 let mut builder = DagBuilder::new(context.clone());
2361 builder.layers(1..=10).build();
2362
2363 let blocks = builder.blocks.values().cloned().collect::<Vec<_>>();
2364
2365 transaction_certifier
2367 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2368 assert!(core.add_blocks(blocks).unwrap().is_empty());
2369
2370 core.round_tracker.write().update_from_probe(
2371 vec![
2372 vec![10, 10, 10, 10],
2373 vec![10, 10, 10, 10],
2374 vec![10, 10, 10, 10],
2375 vec![10, 10, 10, 10],
2376 ],
2377 vec![
2378 vec![10, 10, 10, 10],
2379 vec![10, 10, 10, 10],
2380 vec![10, 10, 10, 10],
2381 vec![10, 10, 10, 10],
2382 ],
2383 );
2384
2385 assert!(core.try_propose(true).unwrap().is_none());
2387
2388 core.set_last_known_proposed_round(10);
2391
2392 let block = core.try_propose(true).expect("No error").unwrap();
2393 assert_eq!(block.round(), 11);
2394 assert_eq!(block.ancestors().len(), 4);
2395
2396 let our_ancestor_included = block.ancestors()[0];
2397 assert_eq!(our_ancestor_included.author, context.own_index);
2398 assert_eq!(our_ancestor_included.round, 10);
2399 }
2400
2401 #[tokio::test(flavor = "current_thread", start_paused = true)]
2402 async fn test_core_try_new_block_leader_timeout() {
2403 telemetry_subscribers::init_for_testing();
2404
2405 async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
2412 let now = context.clock.timestamp_utc_ms();
2414 let max_timestamp = blocks
2415 .iter()
2416 .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
2417 .map(|block| block.timestamp_ms())
2418 .unwrap_or(0);
2419
2420 let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
2421 sleep(wait_time).await;
2422 }
2423
2424 let (context, _) = Context::new_for_test(4);
2425 let mut all_cores = create_cores(context, vec![1, 1, 1, 1]).await;
2427
2428 let (_last_core, cores) = all_cores.split_last_mut().unwrap();
2433
2434 let mut last_round_blocks = Vec::<VerifiedBlock>::new();
2436 for round in 1..=3 {
2437 let mut this_round_blocks = Vec::new();
2438
2439 for core_fixture in cores.iter_mut() {
2440 wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2441
2442 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
2443
2444 if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2446 assert_eq!(round - 1, r);
2447 if core_fixture.core.last_proposed_round() == r {
2448 core_fixture
2450 .core
2451 .try_propose(true)
2452 .unwrap()
2453 .unwrap_or_else(|| {
2454 panic!("Block should have been proposed for round {}", round)
2455 });
2456 }
2457 }
2458
2459 assert_eq!(core_fixture.core.last_proposed_round(), round);
2460
2461 this_round_blocks.push(core_fixture.core.last_proposed_block());
2462 }
2463
2464 last_round_blocks = this_round_blocks;
2465 }
2466
2467 for core_fixture in cores.iter_mut() {
2470 wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2471
2472 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
2473 assert!(core_fixture.core.try_propose(false).unwrap().is_none());
2474 }
2475
2476 for core_fixture in cores.iter_mut() {
2479 assert!(core_fixture.core.new_block(4, true).unwrap().is_some());
2480 assert_eq!(core_fixture.core.last_proposed_round(), 4);
2481
2482 core_fixture.dag_state.write().flush();
2484
2485 let last_commit = core_fixture
2487 .store
2488 .read_last_commit()
2489 .unwrap()
2490 .expect("last commit should be set");
2491 assert_eq!(last_commit.index(), 1);
2494 let all_stored_commits = core_fixture
2495 .store
2496 .scan_commits((0..=CommitIndex::MAX).into())
2497 .unwrap();
2498 assert_eq!(all_stored_commits.len(), 1);
2499 }
2500 }
2501
2502 #[tokio::test(flavor = "current_thread", start_paused = true)]
2503 async fn test_core_try_new_block_with_leader_timeout_and_low_scoring_authority() {
2504 telemetry_subscribers::init_for_testing();
2505
2506 async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
2513 let now = context.clock.timestamp_utc_ms();
2515 let max_timestamp = blocks
2516 .iter()
2517 .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
2518 .map(|block| block.timestamp_ms())
2519 .unwrap_or(0);
2520
2521 let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
2522 sleep(wait_time).await;
2523 }
2524
2525 let (mut context, _) = Context::new_for_test(5);
2526 context
2527 .protocol_config
2528 .set_consensus_bad_nodes_stake_threshold_for_testing(33);
2529
2530 let mut all_cores = create_cores(context, vec![1, 1, 1, 1, 1]).await;
2532 let (_last_core, cores) = all_cores.split_last_mut().unwrap();
2533
2534 let mut last_round_blocks = Vec::<VerifiedBlock>::new();
2536 for round in 1..=30 {
2537 let mut this_round_blocks = Vec::new();
2538
2539 for core_fixture in cores.iter_mut() {
2540 wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2541
2542 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
2543
2544 core_fixture.core.round_tracker.write().update_from_probe(
2545 vec![
2546 vec![round, round, round, round, 0],
2547 vec![round, round, round, round, 0],
2548 vec![round, round, round, round, 0],
2549 vec![round, round, round, round, 0],
2550 vec![0, 0, 0, 0, 0],
2551 ],
2552 vec![
2553 vec![round, round, round, round, 0],
2554 vec![round, round, round, round, 0],
2555 vec![round, round, round, round, 0],
2556 vec![round, round, round, round, 0],
2557 vec![0, 0, 0, 0, 0],
2558 ],
2559 );
2560
2561 if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2563 assert_eq!(round - 1, r);
2564 if core_fixture.core.last_proposed_round() == r {
2565 core_fixture
2567 .core
2568 .try_propose(true)
2569 .unwrap()
2570 .unwrap_or_else(|| {
2571 panic!("Block should have been proposed for round {}", round)
2572 });
2573 }
2574 }
2575
2576 assert_eq!(core_fixture.core.last_proposed_round(), round);
2577
2578 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2579 }
2580
2581 last_round_blocks = this_round_blocks;
2582 }
2583
2584 for round in 31..=40 {
2586 let mut this_round_blocks = Vec::new();
2587
2588 for core_fixture in all_cores.iter_mut() {
2589 wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2590
2591 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
2592
2593 core_fixture.core.round_tracker.write().update_from_probe(
2596 vec![
2597 vec![round, round, round, round, 0],
2598 vec![round, round, round, round, 0],
2599 vec![round, round, round, round, 0],
2600 vec![round, round, round, round, 0],
2601 vec![0, 0, 0, 0, 0],
2602 ],
2603 vec![
2604 vec![round, round, round, round, 0],
2605 vec![round, round, round, round, 0],
2606 vec![round, round, round, round, 0],
2607 vec![round, round, round, round, 0],
2608 vec![0, 0, 0, 0, 0],
2609 ],
2610 );
2611
2612 if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2614 assert_eq!(round - 1, r);
2615 if core_fixture.core.last_proposed_round() == r {
2616 core_fixture
2618 .core
2619 .try_propose(true)
2620 .unwrap()
2621 .unwrap_or_else(|| {
2622 panic!("Block should have been proposed for round {}", round)
2623 });
2624 }
2625 }
2626
2627 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2628
2629 for block in this_round_blocks.iter() {
2630 if block.author() != AuthorityIndex::new_for_test(4) {
2631 assert_eq!(block.ancestors().len(), 4);
2634 } else {
2635 assert_eq!(block.ancestors().len(), 5);
2638 }
2639 }
2640 }
2641
2642 last_round_blocks = this_round_blocks;
2643 }
2644 }
2645
2646 #[tokio::test]
2647 async fn test_smart_ancestor_selection() {
2648 telemetry_subscribers::init_for_testing();
2649 let (mut context, mut key_pairs) = Context::new_for_test(7);
2650 context
2651 .protocol_config
2652 .set_consensus_bad_nodes_stake_threshold_for_testing(33);
2653 let context = Arc::new(context.with_parameters(Parameters {
2654 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2655 ..Default::default()
2656 }));
2657
2658 let store = Arc::new(MemStore::new());
2659 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2660
2661 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2662 let leader_schedule = Arc::new(
2663 LeaderSchedule::from_store(context.clone(), dag_state.clone())
2664 .with_num_commits_per_schedule(10),
2665 );
2666
2667 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2668 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2669 let (blocks_sender, _blocks_receiver) =
2670 monitored_mpsc::unbounded_channel("consensus_block_output");
2671 let transaction_certifier = TransactionCertifier::new(
2672 context.clone(),
2673 Arc::new(NoopBlockVerifier {}),
2674 dag_state.clone(),
2675 blocks_sender,
2676 );
2677 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2678 let mut block_receiver = signal_receivers.block_broadcast_receiver();
2680
2681 let (commit_consumer, _commit_receiver, _transaction_receiver) =
2682 CommitConsumerArgs::new(0, 0);
2683 let commit_observer = CommitObserver::new(
2684 context.clone(),
2685 commit_consumer,
2686 dag_state.clone(),
2687 transaction_certifier.clone(),
2688 leader_schedule.clone(),
2689 )
2690 .await;
2691
2692 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
2693 let mut core = Core::new(
2694 context.clone(),
2695 leader_schedule,
2696 transaction_consumer,
2697 transaction_certifier.clone(),
2698 block_manager,
2699 commit_observer,
2700 signals,
2701 key_pairs.remove(context.own_index.value()).1,
2702 dag_state.clone(),
2703 true,
2704 round_tracker.clone(),
2705 );
2706
2707 assert_eq!(
2709 core.last_proposed_round(),
2710 GENESIS_ROUND,
2711 "No block should have been created other than genesis"
2712 );
2713
2714 assert!(core.try_propose(true).unwrap().is_none());
2716
2717 let mut builder = DagBuilder::new(context.clone());
2719 builder
2720 .layers(1..=12)
2721 .authorities(vec![AuthorityIndex::new_for_test(1)])
2722 .skip_block()
2723 .build();
2724 let blocks = builder.blocks(1..=12);
2725 transaction_certifier
2727 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2728 assert!(core.add_blocks(blocks).unwrap().is_empty());
2729 core.set_last_known_proposed_round(12);
2730
2731 round_tracker.write().update_from_probe(
2732 vec![
2733 vec![12, 12, 12, 12, 12, 12, 12],
2734 vec![0, 0, 0, 0, 0, 0, 0],
2735 vec![12, 12, 12, 12, 12, 12, 12],
2736 vec![12, 12, 12, 12, 12, 12, 12],
2737 vec![12, 12, 12, 12, 12, 12, 12],
2738 vec![12, 12, 12, 12, 12, 12, 12],
2739 vec![12, 12, 12, 12, 12, 12, 12],
2740 ],
2741 vec![
2742 vec![12, 12, 12, 12, 12, 12, 12],
2743 vec![0, 0, 0, 0, 0, 0, 0],
2744 vec![12, 12, 12, 12, 12, 12, 12],
2745 vec![12, 12, 12, 12, 12, 12, 12],
2746 vec![12, 12, 12, 12, 12, 12, 12],
2747 vec![12, 12, 12, 12, 12, 12, 12],
2748 vec![12, 12, 12, 12, 12, 12, 12],
2749 ],
2750 );
2751
2752 let block = core.try_propose(true).expect("No error").unwrap();
2753 assert_eq!(block.round(), 13);
2754 assert_eq!(block.ancestors().len(), 7);
2755
2756 builder
2758 .layers(13..=14)
2759 .authorities(vec![AuthorityIndex::new_for_test(0)])
2760 .skip_block()
2761 .build();
2762 let blocks = builder.blocks(13..=14);
2763 transaction_certifier
2764 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2765 assert!(core.add_blocks(blocks).unwrap().is_empty());
2766
2767 let block = core.try_propose(true).expect("No error").unwrap();
2770 assert_eq!(block.round(), 15);
2771 assert_eq!(block.ancestors().len(), 6);
2772
2773 let round_14_ancestors = builder.last_ancestors.clone();
2776 builder
2777 .layer(15)
2778 .authorities(vec![
2779 AuthorityIndex::new_for_test(0),
2780 AuthorityIndex::new_for_test(5),
2781 AuthorityIndex::new_for_test(6),
2782 ])
2783 .skip_block()
2784 .build();
2785 let blocks = builder.blocks(15..=15);
2786 let authority_1_excluded_block_reference = blocks
2787 .iter()
2788 .find(|block| block.author() == AuthorityIndex::new_for_test(1))
2789 .unwrap()
2790 .reference();
2791 sleep(context.parameters.min_round_delay).await;
2793 transaction_certifier
2795 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2796 assert!(core.add_blocks(blocks).unwrap().is_empty());
2797 assert_eq!(core.last_proposed_block().round(), 15);
2798
2799 builder
2800 .layer(15)
2801 .authorities(vec![
2802 AuthorityIndex::new_for_test(0),
2803 AuthorityIndex::new_for_test(1),
2804 AuthorityIndex::new_for_test(2),
2805 AuthorityIndex::new_for_test(3),
2806 AuthorityIndex::new_for_test(4),
2807 ])
2808 .skip_block()
2809 .override_last_ancestors(round_14_ancestors)
2810 .build();
2811 let blocks = builder.blocks(15..=15);
2812 let round_15_ancestors: Vec<BlockRef> = blocks
2813 .iter()
2814 .filter(|block| block.round() == 15)
2815 .map(|block| block.reference())
2816 .collect();
2817 let included_block_references = iter::once(&core.last_proposed_block())
2818 .chain(blocks.iter())
2819 .filter(|block| block.author() != AuthorityIndex::new_for_test(1))
2820 .map(|block| block.reference())
2821 .collect::<Vec<_>>();
2822
2823 transaction_certifier
2825 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2826 assert!(core.add_blocks(blocks).unwrap().is_empty());
2827 assert_eq!(core.last_proposed_block().round(), 16);
2828
2829 let extended_block = loop {
2831 let extended_block =
2832 tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2833 .await
2834 .unwrap()
2835 .unwrap();
2836 if extended_block.block.round() == 16 {
2837 break extended_block;
2838 }
2839 };
2840 assert_eq!(extended_block.block.round(), 16);
2841 assert_eq!(extended_block.block.author(), core.context.own_index);
2842 assert_eq!(extended_block.block.ancestors().len(), 6);
2843 assert_eq!(extended_block.block.ancestors(), included_block_references);
2844 assert_eq!(extended_block.excluded_ancestors.len(), 1);
2845 assert_eq!(
2846 extended_block.excluded_ancestors[0],
2847 authority_1_excluded_block_reference
2848 );
2849
2850 builder
2855 .layer(16)
2856 .authorities(vec![
2857 AuthorityIndex::new_for_test(0),
2858 AuthorityIndex::new_for_test(5),
2859 AuthorityIndex::new_for_test(6),
2860 ])
2861 .skip_block()
2862 .override_last_ancestors(round_15_ancestors)
2863 .build();
2864 let blocks = builder.blocks(16..=16);
2865 sleep(context.parameters.min_round_delay).await;
2867 transaction_certifier
2869 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2870 assert!(core.add_blocks(blocks).unwrap().is_empty());
2871 assert_eq!(core.last_proposed_block().round(), 16);
2872
2873 let block = core.try_propose(true).expect("No error").unwrap();
2876 assert_eq!(block.round(), 17);
2877 assert_eq!(block.ancestors().len(), 5);
2878
2879 let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2881 .await
2882 .unwrap()
2883 .unwrap();
2884 assert_eq!(extended_block.block.round(), 17);
2885 assert_eq!(extended_block.block.author(), core.context.own_index);
2886 assert_eq!(extended_block.block.ancestors().len(), 5);
2887 assert_eq!(extended_block.excluded_ancestors.len(), 0);
2888
2889 builder
2892 .layers(17..=22)
2893 .authorities(vec![AuthorityIndex::new_for_test(0)])
2894 .skip_block()
2895 .build();
2896 let blocks = builder.blocks(17..=22);
2897
2898 round_tracker.write().update_from_probe(
2904 vec![
2905 vec![22, 22, 22, 22, 22, 22, 22],
2906 vec![22, 22, 22, 22, 22, 22, 22],
2907 vec![22, 22, 22, 22, 22, 22, 22],
2908 vec![22, 22, 22, 22, 22, 22, 22],
2909 vec![22, 22, 22, 22, 22, 22, 22],
2910 vec![22, 22, 22, 22, 22, 22, 22],
2911 vec![22, 22, 22, 22, 22, 22, 22],
2912 ],
2913 vec![
2914 vec![22, 22, 22, 22, 22, 22, 22],
2915 vec![22, 22, 22, 22, 22, 22, 22],
2916 vec![22, 22, 22, 22, 22, 22, 22],
2917 vec![22, 22, 22, 22, 22, 22, 22],
2918 vec![22, 22, 22, 22, 22, 22, 22],
2919 vec![22, 22, 22, 22, 22, 22, 22],
2920 vec![22, 22, 22, 22, 22, 22, 22],
2921 ],
2922 );
2923
2924 let included_block_references = iter::once(&core.last_proposed_block())
2925 .chain(blocks.iter())
2926 .filter(|block| block.round() == 22 || block.author() == core.context.own_index)
2927 .map(|block| block.reference())
2928 .collect::<Vec<_>>();
2929
2930 sleep(context.parameters.min_round_delay).await;
2932 transaction_certifier
2933 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2934 assert!(core.add_blocks(blocks).unwrap().is_empty());
2935 assert_eq!(core.last_proposed_block().round(), 23);
2936
2937 let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2939 .await
2940 .unwrap()
2941 .unwrap();
2942 assert_eq!(extended_block.block.round(), 23);
2943 assert_eq!(extended_block.block.author(), core.context.own_index);
2944 assert_eq!(extended_block.block.ancestors().len(), 7);
2945 assert_eq!(extended_block.block.ancestors(), included_block_references);
2946 assert_eq!(extended_block.excluded_ancestors.len(), 0);
2947 }
2948
2949 #[tokio::test]
2950 async fn test_excluded_ancestor_limit() {
2951 telemetry_subscribers::init_for_testing();
2952 let (context, mut key_pairs) = Context::new_for_test(4);
2953 let context = Arc::new(context.with_parameters(Parameters {
2954 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2955 ..Default::default()
2956 }));
2957
2958 let store = Arc::new(MemStore::new());
2959 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2960
2961 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2962 let leader_schedule = Arc::new(
2963 LeaderSchedule::from_store(context.clone(), dag_state.clone())
2964 .with_num_commits_per_schedule(10),
2965 );
2966
2967 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2968 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2969 let (blocks_sender, _blocks_receiver) =
2970 monitored_mpsc::unbounded_channel("consensus_block_output");
2971 let transaction_certifier = TransactionCertifier::new(
2972 context.clone(),
2973 Arc::new(NoopBlockVerifier {}),
2974 dag_state.clone(),
2975 blocks_sender,
2976 );
2977 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2978 let mut block_receiver = signal_receivers.block_broadcast_receiver();
2980
2981 let (commit_consumer, _commit_receiver, _transaction_receiver) =
2982 CommitConsumerArgs::new(0, 0);
2983 let commit_observer = CommitObserver::new(
2984 context.clone(),
2985 commit_consumer,
2986 dag_state.clone(),
2987 transaction_certifier.clone(),
2988 leader_schedule.clone(),
2989 )
2990 .await;
2991
2992 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
2993 let mut core = Core::new(
2994 context.clone(),
2995 leader_schedule,
2996 transaction_consumer,
2997 transaction_certifier.clone(),
2998 block_manager,
2999 commit_observer,
3000 signals,
3001 key_pairs.remove(context.own_index.value()).1,
3002 dag_state.clone(),
3003 true,
3004 round_tracker,
3005 );
3006
3007 assert_eq!(
3009 core.last_proposed_round(),
3010 GENESIS_ROUND,
3011 "No block should have been created other than genesis"
3012 );
3013
3014 let mut builder = DagBuilder::new(context.clone());
3016 builder.layers(1..=3).build();
3017
3018 builder
3022 .layer(4)
3023 .authorities(vec![AuthorityIndex::new_for_test(1)])
3024 .equivocate(9)
3025 .build();
3026 let blocks = builder.blocks(1..=4);
3027
3028 transaction_certifier
3030 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
3031 assert!(core.add_blocks(blocks).unwrap().is_empty());
3032 core.set_last_known_proposed_round(3);
3033
3034 let block = core.try_propose(true).expect("No error").unwrap();
3035 assert_eq!(block.round(), 5);
3036 assert_eq!(block.ancestors().len(), 4);
3037
3038 let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
3040 .await
3041 .unwrap()
3042 .unwrap();
3043 assert_eq!(extended_block.block.round(), 5);
3044 assert_eq!(extended_block.block.author(), core.context.own_index);
3045 assert_eq!(extended_block.block.ancestors().len(), 4);
3046 assert_eq!(extended_block.excluded_ancestors.len(), 8);
3047 }
3048
3049 #[tokio::test]
3050 async fn test_core_set_propagation_delay_per_authority() {
3051 telemetry_subscribers::init_for_testing();
3053 let (context, mut key_pairs) = Context::new_for_test(4);
3054 let context = Arc::new(context);
3055 let store = Arc::new(MemStore::new());
3056 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3057
3058 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
3059 let leader_schedule = Arc::new(LeaderSchedule::from_store(
3060 context.clone(),
3061 dag_state.clone(),
3062 ));
3063
3064 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3065 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3066 let (signals, signal_receivers) = CoreSignals::new(context.clone());
3067 let (blocks_sender, _blocks_receiver) =
3068 monitored_mpsc::unbounded_channel("consensus_block_output");
3069 let transaction_certifier = TransactionCertifier::new(
3070 context.clone(),
3071 Arc::new(NoopBlockVerifier {}),
3072 dag_state.clone(),
3073 blocks_sender,
3074 );
3075 let _block_receiver = signal_receivers.block_broadcast_receiver();
3077
3078 let (commit_consumer, _commit_receiver, _transaction_receiver) =
3079 CommitConsumerArgs::new(0, 0);
3080 let commit_observer = CommitObserver::new(
3081 context.clone(),
3082 commit_consumer,
3083 dag_state.clone(),
3084 transaction_certifier.clone(),
3085 leader_schedule.clone(),
3086 )
3087 .await;
3088
3089 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
3090 let mut core = Core::new(
3091 context.clone(),
3092 leader_schedule,
3093 transaction_consumer,
3094 transaction_certifier.clone(),
3095 block_manager,
3096 commit_observer,
3097 signals,
3098 key_pairs.remove(context.own_index.value()).1,
3099 dag_state.clone(),
3100 false,
3101 round_tracker.clone(),
3102 );
3103
3104 let test_block = VerifiedBlock::new_for_test(TestBlock::new(1000, 0).build());
3109 transaction_certifier.add_voted_blocks(vec![(test_block.clone(), vec![])]);
3110 dag_state.write().accept_block(test_block);
3112
3113 round_tracker.write().update_from_probe(
3114 vec![
3115 vec![0, 0, 0, 0],
3116 vec![0, 0, 0, 0],
3117 vec![0, 0, 0, 0],
3118 vec![0, 0, 0, 0],
3119 ],
3120 vec![
3121 vec![0, 0, 0, 0],
3122 vec![0, 0, 0, 0],
3123 vec![0, 0, 0, 0],
3124 vec![0, 0, 0, 0],
3125 ],
3126 );
3127
3128 assert!(core.try_propose(true).unwrap().is_none());
3130
3131 round_tracker.write().update_from_probe(
3135 vec![
3136 vec![1000, 1000, 1000, 1000],
3137 vec![1000, 1000, 1000, 1000],
3138 vec![1000, 1000, 1000, 1000],
3139 vec![1000, 1000, 1000, 1000],
3140 ],
3141 vec![
3142 vec![1000, 1000, 1000, 1000],
3143 vec![1000, 1000, 1000, 1000],
3144 vec![1000, 1000, 1000, 1000],
3145 vec![1000, 1000, 1000, 1000],
3146 ],
3147 );
3148
3149 for author in 1..4 {
3152 let block = VerifiedBlock::new_for_test(TestBlock::new(1000, author).build());
3153 transaction_certifier.add_voted_blocks(vec![(block.clone(), vec![])]);
3154 dag_state.write().accept_block(block);
3156 }
3157
3158 assert!(core.try_propose(true).unwrap().is_some());
3160 }
3161
3162 #[tokio::test(flavor = "current_thread", start_paused = true)]
3163 async fn test_leader_schedule_change() {
3164 telemetry_subscribers::init_for_testing();
3165 let default_params = Parameters::default();
3166
3167 let (context, _) = Context::new_for_test(4);
3168 let mut cores = create_cores(context, vec![1, 1, 1, 1]).await;
3170
3171 let mut last_round_blocks = Vec::new();
3173 for round in 1..=30 {
3174 let mut this_round_blocks = Vec::new();
3175
3176 sleep(default_params.min_round_delay).await;
3178
3179 for core_fixture in &mut cores {
3180 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
3183
3184 core_fixture.core.round_tracker.write().update_from_probe(
3185 vec![
3186 vec![round, round, round, round],
3187 vec![round, round, round, round],
3188 vec![round, round, round, round],
3189 vec![round, round, round, round],
3190 ],
3191 vec![
3192 vec![round, round, round, round],
3193 vec![round, round, round, round],
3194 vec![round, round, round, round],
3195 vec![round, round, round, round],
3196 ],
3197 );
3198
3199 let new_round = receive(
3201 Duration::from_secs(1),
3202 core_fixture.signal_receivers.new_round_receiver(),
3203 )
3204 .await;
3205 assert_eq!(new_round, round);
3206
3207 let extended_block = tokio::time::timeout(
3209 Duration::from_secs(1),
3210 core_fixture.block_receiver.recv(),
3211 )
3212 .await
3213 .unwrap()
3214 .unwrap();
3215 assert_eq!(extended_block.block.round(), round);
3216 assert_eq!(
3217 extended_block.block.author(),
3218 core_fixture.core.context.own_index
3219 );
3220
3221 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3223
3224 let block = core_fixture.core.last_proposed_block();
3225
3226 assert_eq!(
3228 block.ancestors().len(),
3229 core_fixture.core.context.committee.size()
3230 );
3231 for ancestor in block.ancestors() {
3232 if block.round() > 1 {
3233 assert!(
3235 last_round_blocks
3236 .iter()
3237 .any(|block| block.reference() == *ancestor),
3238 "Reference from previous round should be added"
3239 );
3240 }
3241 }
3242 }
3243
3244 last_round_blocks = this_round_blocks;
3245 }
3246
3247 for core_fixture in cores {
3248 core_fixture.dag_state.write().flush();
3250
3251 let last_commit = core_fixture
3253 .store
3254 .read_last_commit()
3255 .unwrap()
3256 .expect("last commit should be set");
3257 assert_eq!(last_commit.index(), 27);
3261 let all_stored_commits = core_fixture
3262 .store
3263 .scan_commits((0..=CommitIndex::MAX).into())
3264 .unwrap();
3265 assert_eq!(all_stored_commits.len(), 27);
3266 assert_eq!(
3267 core_fixture
3268 .core
3269 .leader_schedule
3270 .leader_swap_table
3271 .read()
3272 .bad_nodes
3273 .len(),
3274 1
3275 );
3276 assert_eq!(
3277 core_fixture
3278 .core
3279 .leader_schedule
3280 .leader_swap_table
3281 .read()
3282 .good_nodes
3283 .len(),
3284 1
3285 );
3286 let expected_reputation_scores =
3287 ReputationScores::new((11..=20).into(), vec![29, 29, 29, 29]);
3288 assert_eq!(
3289 core_fixture
3290 .core
3291 .leader_schedule
3292 .leader_swap_table
3293 .read()
3294 .reputation_scores,
3295 expected_reputation_scores
3296 );
3297 }
3298 }
3299
3300 #[tokio::test]
3301 async fn test_filter_new_commits() {
3302 telemetry_subscribers::init_for_testing();
3303
3304 let (context, _key_pairs) = Context::new_for_test(4);
3305 let context = context.with_parameters(Parameters {
3306 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3307 ..Default::default()
3308 });
3309
3310 let authority_index = AuthorityIndex::new_for_test(0);
3311 let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true).await;
3312 let mut core = core.core;
3313
3314 assert_eq!(
3316 core.last_proposed_round(),
3317 GENESIS_ROUND,
3318 "No block should have been created other than genesis"
3319 );
3320
3321 let mut dag_builder = DagBuilder::new(core.context.clone());
3323 dag_builder.layers(1..=12).build();
3324
3325 dag_builder.print();
3327 let blocks = dag_builder.blocks(1..=6);
3328
3329 for block in blocks {
3330 core.dag_state.write().accept_block(block);
3331 }
3332
3333 let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
3335
3336 let committed_sub_dags = core.try_commit(vec![]).unwrap();
3338
3339 assert_eq!(committed_sub_dags.len(), 4);
3341
3342 println!("Case 1. Provide certified commits that are all before the last committed round.");
3344
3345 let certified_commits = sub_dags_and_commits
3347 .iter()
3348 .take(4)
3349 .map(|(_, c)| c)
3350 .cloned()
3351 .collect::<Vec<_>>();
3352 assert!(
3353 certified_commits.last().unwrap().index()
3354 <= committed_sub_dags.last().unwrap().commit_ref.index,
3355 "Highest certified commit should older than the highest committed index."
3356 );
3357
3358 let certified_commits = core.filter_new_commits(certified_commits).unwrap();
3359
3360 assert!(certified_commits.is_empty());
3362
3363 println!("Case 2. Provide certified commits that are all after the last committed round.");
3364
3365 let certified_commits = sub_dags_and_commits
3367 .iter()
3368 .take(5)
3369 .map(|(_, c)| c.clone())
3370 .collect::<Vec<_>>();
3371
3372 let certified_commits = core.filter_new_commits(certified_commits.clone()).unwrap();
3373
3374 assert_eq!(certified_commits.len(), 1);
3376 assert_eq!(certified_commits.first().unwrap().reference().index, 5);
3377
3378 println!(
3379 "Case 3. Provide certified commits where the first certified commit index is not the last_commited_index + 1."
3380 );
3381
3382 let certified_commits = sub_dags_and_commits
3384 .iter()
3385 .skip(5)
3386 .take(1)
3387 .map(|(_, c)| c.clone())
3388 .collect::<Vec<_>>();
3389
3390 let err = core
3391 .filter_new_commits(certified_commits.clone())
3392 .unwrap_err();
3393 match err {
3394 ConsensusError::UnexpectedCertifiedCommitIndex {
3395 expected_commit_index: 5,
3396 commit_index: 6,
3397 } => (),
3398 _ => panic!("Unexpected error: {:?}", err),
3399 }
3400 }
3401
3402 #[tokio::test]
3403 async fn test_add_certified_commits() {
3404 telemetry_subscribers::init_for_testing();
3405
3406 let (context, _key_pairs) = Context::new_for_test(4);
3407 let context = context.with_parameters(Parameters {
3408 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3409 ..Default::default()
3410 });
3411
3412 let authority_index = AuthorityIndex::new_for_test(0);
3413 let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true).await;
3414 let store = core.store.clone();
3415 let mut core = core.core;
3416
3417 assert_eq!(
3419 core.last_proposed_round(),
3420 GENESIS_ROUND,
3421 "No block should have been created other than genesis"
3422 );
3423
3424 let mut dag_builder = DagBuilder::new(core.context.clone());
3426 dag_builder.layers(1..=12).build();
3427
3428 dag_builder.print();
3430 let blocks = dag_builder.blocks(1..=6);
3431
3432 for block in blocks {
3433 core.dag_state.write().accept_block(block);
3434 }
3435
3436 let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
3438
3439 let committed_sub_dags = core.try_commit(vec![]).unwrap();
3441
3442 assert_eq!(committed_sub_dags.len(), 4);
3444
3445 core.dag_state.write().flush();
3447
3448 println!("Case 1. Provide no certified commits. No commit should happen.");
3449
3450 let last_commit = store
3451 .read_last_commit()
3452 .unwrap()
3453 .expect("Last commit should be set");
3454 assert_eq!(last_commit.reference().index, 4);
3455
3456 println!(
3457 "Case 2. Provide certified commits that before and after the last committed round and also there are additional blocks so can run the direct decide rule as well."
3458 );
3459
3460 let certified_commits = sub_dags_and_commits
3462 .iter()
3463 .skip(3)
3464 .take(5)
3465 .map(|(_, c)| c.clone())
3466 .collect::<Vec<_>>();
3467
3468 let blocks = dag_builder.blocks(8..=12);
3470 for block in blocks {
3471 core.dag_state.write().accept_block(block);
3472 }
3473
3474 core.add_certified_commits(CertifiedCommits::new(certified_commits.clone(), vec![]))
3476 .expect("Should not fail");
3477
3478 core.dag_state.write().flush();
3480
3481 let commits = store.scan_commits((6..=10).into()).unwrap();
3482
3483 assert_eq!(commits.len(), 5);
3485
3486 for i in 6..=10 {
3487 let commit = &commits[i - 6];
3488 assert_eq!(commit.reference().index, i as u32);
3489 }
3490 }
3491
3492 #[tokio::test]
3493 async fn try_commit_with_certified_commits_gced_blocks() {
3494 const GC_DEPTH: u32 = 3;
3495 telemetry_subscribers::init_for_testing();
3496
3497 let (mut context, mut key_pairs) = Context::new_for_test(5);
3498 context
3499 .protocol_config
3500 .set_consensus_gc_depth_for_testing(GC_DEPTH);
3501 let context = Arc::new(context.with_parameters(Parameters {
3502 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3503 ..Default::default()
3504 }));
3505
3506 let store = Arc::new(MemStore::new());
3507 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3508
3509 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
3510 let leader_schedule = Arc::new(
3511 LeaderSchedule::from_store(context.clone(), dag_state.clone())
3512 .with_num_commits_per_schedule(10),
3513 );
3514
3515 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3516 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3517 let (signals, signal_receivers) = CoreSignals::new(context.clone());
3518 let (blocks_sender, _blocks_receiver) =
3519 monitored_mpsc::unbounded_channel("consensus_block_output");
3520 let transaction_certifier = TransactionCertifier::new(
3521 context.clone(),
3522 Arc::new(NoopBlockVerifier {}),
3523 dag_state.clone(),
3524 blocks_sender,
3525 );
3526 let _block_receiver = signal_receivers.block_broadcast_receiver();
3528
3529 let (commit_consumer, _commit_receiver, _transaction_receiver) =
3530 CommitConsumerArgs::new(0, 0);
3531 let commit_observer = CommitObserver::new(
3532 context.clone(),
3533 commit_consumer,
3534 dag_state.clone(),
3535 transaction_certifier.clone(),
3536 leader_schedule.clone(),
3537 )
3538 .await;
3539
3540 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
3541 let mut core = Core::new(
3542 context.clone(),
3543 leader_schedule,
3544 transaction_consumer,
3545 transaction_certifier.clone(),
3546 block_manager,
3547 commit_observer,
3548 signals,
3549 key_pairs.remove(context.own_index.value()).1,
3550 dag_state.clone(),
3551 true,
3552 round_tracker,
3553 );
3554
3555 assert_eq!(
3557 core.last_proposed_round(),
3558 GENESIS_ROUND,
3559 "No block should have been created other than genesis"
3560 );
3561
3562 let dag_str = "DAG {
3563 Round 0 : { 5 },
3564 Round 1 : { * },
3565 Round 2 : {
3566 A -> [-E1],
3567 B -> [-E1],
3568 C -> [-E1],
3569 D -> [-E1],
3570 },
3571 Round 3 : {
3572 A -> [*],
3573 B -> [*],
3574 C -> [*],
3575 D -> [*],
3576 },
3577 Round 4 : {
3578 A -> [*],
3579 B -> [*],
3580 C -> [*],
3581 D -> [*],
3582 },
3583 Round 5 : {
3584 A -> [*],
3585 B -> [*],
3586 C -> [*],
3587 D -> [*],
3588 E -> [A4, B4, C4, D4, E1]
3589 },
3590 Round 6 : { * },
3591 Round 7 : { * },
3592 }";
3593
3594 let (_, mut dag_builder) = parse_dag(dag_str).expect("Invalid dag");
3595 dag_builder.print();
3596
3597 let (_sub_dags, certified_commits): (Vec<_>, Vec<_>) = dag_builder
3599 .get_sub_dag_and_certified_commits(1..=5)
3600 .into_iter()
3601 .unzip();
3602
3603 let committed_sub_dags = core.try_commit(certified_commits).unwrap();
3606
3607 assert_eq!(committed_sub_dags.len(), 4);
3609 for (index, committed_sub_dag) in committed_sub_dags.iter().enumerate() {
3610 assert_eq!(committed_sub_dag.commit_ref.index as usize, index + 1);
3611
3612 for block in committed_sub_dag.blocks.iter() {
3614 if block.round() == 1 && block.author() == AuthorityIndex::new_for_test(5) {
3615 panic!("Did not expect to commit block E1");
3616 }
3617 }
3618 }
3619 }
3620
3621 #[tokio::test(flavor = "current_thread", start_paused = true)]
3622 async fn test_commit_on_leader_schedule_change_boundary_without_multileader() {
3623 parameterized_test_commit_on_leader_schedule_change_boundary(Some(1)).await;
3624 }
3625
3626 #[tokio::test(flavor = "current_thread", start_paused = true)]
3627 async fn test_commit_on_leader_schedule_change_boundary_with_multileader() {
3628 parameterized_test_commit_on_leader_schedule_change_boundary(None).await;
3629 }
3630
3631 async fn parameterized_test_commit_on_leader_schedule_change_boundary(
3632 num_leaders_per_round: Option<usize>,
3633 ) {
3634 telemetry_subscribers::init_for_testing();
3635 let default_params = Parameters::default();
3636
3637 let (mut context, _) = Context::new_for_test(6);
3638 context
3639 .protocol_config
3640 .set_mysticeti_num_leaders_per_round_for_testing(num_leaders_per_round);
3641 let mut cores = create_cores(context, vec![1, 1, 1, 1, 1, 1]).await;
3643
3644 let mut last_round_blocks: Vec<VerifiedBlock> = Vec::new();
3646 for round in 1..=33 {
3647 let mut this_round_blocks = Vec::new();
3648
3649 sleep(default_params.min_round_delay).await;
3651
3652 for core_fixture in &mut cores {
3653 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
3656
3657 core_fixture.core.round_tracker.write().update_from_probe(
3658 vec![
3659 vec![round, round, round, round, round, round],
3660 vec![round, round, round, round, round, round],
3661 vec![round, round, round, round, round, round],
3662 vec![round, round, round, round, round, round],
3663 vec![round, round, round, round, round, round],
3664 vec![round, round, round, round, round, round],
3665 ],
3666 vec![
3667 vec![round, round, round, round, round, round],
3668 vec![round, round, round, round, round, round],
3669 vec![round, round, round, round, round, round],
3670 vec![round, round, round, round, round, round],
3671 vec![round, round, round, round, round, round],
3672 vec![round, round, round, round, round, round],
3673 ],
3674 );
3675
3676 let new_round = receive(
3678 Duration::from_secs(1),
3679 core_fixture.signal_receivers.new_round_receiver(),
3680 )
3681 .await;
3682 assert_eq!(new_round, round);
3683
3684 let extended_block = tokio::time::timeout(
3686 Duration::from_secs(1),
3687 core_fixture.block_receiver.recv(),
3688 )
3689 .await
3690 .unwrap()
3691 .unwrap();
3692 assert_eq!(extended_block.block.round(), round);
3693 assert_eq!(
3694 extended_block.block.author(),
3695 core_fixture.core.context.own_index
3696 );
3697
3698 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3700
3701 let block = core_fixture.core.last_proposed_block();
3702
3703 assert_eq!(
3705 block.ancestors().len(),
3706 core_fixture.core.context.committee.size()
3707 );
3708 for ancestor in block.ancestors() {
3709 if block.round() > 1 {
3710 assert!(
3712 last_round_blocks
3713 .iter()
3714 .any(|block| block.reference() == *ancestor),
3715 "Reference from previous round should be added"
3716 );
3717 }
3718 }
3719 }
3720
3721 last_round_blocks = this_round_blocks;
3722 }
3723
3724 for core_fixture in cores {
3725 let expected_commit_count = match num_leaders_per_round {
3737 Some(1) => 30,
3738 _ => 31,
3739 };
3740
3741 core_fixture.dag_state.write().flush();
3743
3744 let last_commit = core_fixture
3746 .store
3747 .read_last_commit()
3748 .unwrap()
3749 .expect("last commit should be set");
3750 assert_eq!(last_commit.index(), expected_commit_count);
3751 let all_stored_commits = core_fixture
3752 .store
3753 .scan_commits((0..=CommitIndex::MAX).into())
3754 .unwrap();
3755 assert_eq!(all_stored_commits.len(), expected_commit_count as usize);
3756 assert_eq!(
3757 core_fixture
3758 .core
3759 .leader_schedule
3760 .leader_swap_table
3761 .read()
3762 .bad_nodes
3763 .len(),
3764 1
3765 );
3766 assert_eq!(
3767 core_fixture
3768 .core
3769 .leader_schedule
3770 .leader_swap_table
3771 .read()
3772 .good_nodes
3773 .len(),
3774 1
3775 );
3776 let expected_reputation_scores =
3777 ReputationScores::new((21..=30).into(), vec![43, 43, 43, 43, 43, 43]);
3778 assert_eq!(
3779 core_fixture
3780 .core
3781 .leader_schedule
3782 .leader_swap_table
3783 .read()
3784 .reputation_scores,
3785 expected_reputation_scores
3786 );
3787 }
3788 }
3789
3790 #[tokio::test]
3791 async fn test_core_signals() {
3792 telemetry_subscribers::init_for_testing();
3793 let default_params = Parameters::default();
3794
3795 let (context, _) = Context::new_for_test(4);
3796 let mut cores = create_cores(context, vec![1, 1, 1, 1]).await;
3798
3799 let mut last_round_blocks = Vec::new();
3801 for round in 1..=10 {
3802 let mut this_round_blocks = Vec::new();
3803
3804 sleep(default_params.min_round_delay).await;
3806
3807 for core_fixture in &mut cores {
3808 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
3811
3812 core_fixture.core.round_tracker.write().update_from_probe(
3813 vec![
3814 vec![round, round, round, round],
3815 vec![round, round, round, round],
3816 vec![round, round, round, round],
3817 vec![round, round, round, round],
3818 ],
3819 vec![
3820 vec![round, round, round, round],
3821 vec![round, round, round, round],
3822 vec![round, round, round, round],
3823 vec![round, round, round, round],
3824 ],
3825 );
3826
3827 let new_round = receive(
3829 Duration::from_secs(1),
3830 core_fixture.signal_receivers.new_round_receiver(),
3831 )
3832 .await;
3833 assert_eq!(new_round, round);
3834
3835 let extended_block = tokio::time::timeout(
3837 Duration::from_secs(1),
3838 core_fixture.block_receiver.recv(),
3839 )
3840 .await
3841 .unwrap()
3842 .unwrap();
3843 assert_eq!(extended_block.block.round(), round);
3844 assert_eq!(
3845 extended_block.block.author(),
3846 core_fixture.core.context.own_index
3847 );
3848
3849 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3851
3852 let block = core_fixture.core.last_proposed_block();
3853
3854 assert_eq!(
3856 block.ancestors().len(),
3857 core_fixture.core.context.committee.size()
3858 );
3859 for ancestor in block.ancestors() {
3860 if block.round() > 1 {
3861 assert!(
3863 last_round_blocks
3864 .iter()
3865 .any(|block| block.reference() == *ancestor),
3866 "Reference from previous round should be added"
3867 );
3868 }
3869 }
3870 }
3871
3872 last_round_blocks = this_round_blocks;
3873 }
3874
3875 for core_fixture in cores {
3876 core_fixture.dag_state.write().flush();
3878 let last_commit = core_fixture
3880 .store
3881 .read_last_commit()
3882 .unwrap()
3883 .expect("last commit should be set");
3884 assert_eq!(last_commit.index(), 7);
3888 let all_stored_commits = core_fixture
3889 .store
3890 .scan_commits((0..=CommitIndex::MAX).into())
3891 .unwrap();
3892 assert_eq!(all_stored_commits.len(), 7);
3893 }
3894 }
3895
3896 #[tokio::test]
3897 async fn test_core_compress_proposal_references() {
3898 telemetry_subscribers::init_for_testing();
3899 let default_params = Parameters::default();
3900
3901 let (context, _) = Context::new_for_test(4);
3902 let mut cores = create_cores(context, vec![1, 1, 1, 1]).await;
3904
3905 let mut last_round_blocks = Vec::new();
3906 let mut all_blocks = Vec::new();
3907
3908 let excluded_authority = AuthorityIndex::new_for_test(3);
3909
3910 for round in 1..=10 {
3911 let mut this_round_blocks = Vec::new();
3912
3913 for core_fixture in &mut cores {
3914 if core_fixture.core.context.own_index == excluded_authority {
3916 continue;
3917 }
3918
3919 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
3921 core_fixture.core.round_tracker.write().update_from_probe(
3922 vec![
3923 vec![round, round, round, round],
3924 vec![round, round, round, round],
3925 vec![round, round, round, round],
3926 vec![round, round, round, round],
3927 ],
3928 vec![
3929 vec![round, round, round, round],
3930 vec![round, round, round, round],
3931 vec![round, round, round, round],
3932 vec![round, round, round, round],
3933 ],
3934 );
3935 core_fixture.core.new_block(round, true).unwrap();
3936
3937 let block = core_fixture.core.last_proposed_block();
3938 assert_eq!(block.round(), round);
3939
3940 this_round_blocks.push(block.clone());
3942 }
3943
3944 last_round_blocks = this_round_blocks.clone();
3945 all_blocks.extend(this_round_blocks);
3946 }
3947
3948 let core_fixture = &mut cores[excluded_authority];
3952 sleep(default_params.min_round_delay).await;
3954 core_fixture.add_blocks(all_blocks).unwrap();
3956
3957 let block = core_fixture.core.last_proposed_block();
3960 assert_eq!(block.round(), 11);
3961 assert_eq!(block.ancestors().len(), 4);
3962 for block_ref in block.ancestors() {
3963 if block_ref.author == excluded_authority {
3964 assert_eq!(block_ref.round, 1);
3965 } else {
3966 assert_eq!(block_ref.round, 10);
3967 }
3968 }
3969
3970 core_fixture.dag_state.write().flush();
3972
3973 let last_commit = core_fixture
3975 .store
3976 .read_last_commit()
3977 .unwrap()
3978 .expect("last commit should be set");
3979 assert_eq!(last_commit.index(), 6);
3983 let all_stored_commits = core_fixture
3984 .store
3985 .scan_commits((0..=CommitIndex::MAX).into())
3986 .unwrap();
3987 assert_eq!(all_stored_commits.len(), 6);
3988 }
3989
3990 #[tokio::test]
3991 async fn try_select_certified_leaders() {
3992 telemetry_subscribers::init_for_testing();
3994
3995 let (context, _) = Context::new_for_test(4);
3996
3997 let authority_index = AuthorityIndex::new_for_test(0);
3998 let core =
3999 CoreTextFixture::new(context.clone(), vec![1, 1, 1, 1], authority_index, true).await;
4000 let mut core = core.core;
4001
4002 let mut dag_builder = DagBuilder::new(Arc::new(context.clone()));
4003 dag_builder.layers(1..=12).build();
4004
4005 let limit = 2;
4006
4007 let blocks = dag_builder.blocks(1..=12);
4008
4009 for block in blocks {
4010 core.dag_state.write().accept_block(block);
4011 }
4012
4013 let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=4);
4015 let mut certified_commits = sub_dags_and_commits
4016 .into_iter()
4017 .map(|(_, commit)| commit)
4018 .collect::<Vec<_>>();
4019
4020 let leaders = core.try_select_certified_leaders(&mut certified_commits, limit);
4021
4022 assert_eq!(leaders.len(), 2);
4024 assert_eq!(certified_commits.len(), 2);
4025 }
4026
4027 pub(crate) async fn receive<T: Copy>(timeout: Duration, mut receiver: watch::Receiver<T>) -> T {
4028 tokio::time::timeout(timeout, receiver.changed())
4029 .await
4030 .expect("Timeout while waiting to read from receiver")
4031 .expect("Signal receive channel shouldn't be closed");
4032 *receiver.borrow_and_update()
4033 }
4034}