1use std::{
5 collections::{BTreeMap, BTreeSet},
6 iter,
7 sync::Arc,
8 time::Duration,
9 vec,
10};
11
12use consensus_config::{AuthorityIndex, ProtocolKeyPair};
13#[cfg(test)]
14use consensus_config::{Stake, local_committee_and_keys};
15use consensus_types::block::{BlockRef, BlockTimestampMs, Round};
16use itertools::Itertools as _;
17#[cfg(test)]
18use mysten_metrics::monitored_mpsc::UnboundedReceiver;
19use mysten_metrics::monitored_scope;
20use parking_lot::RwLock;
21use sui_macros::fail_point;
22use tokio::{
23 sync::{broadcast, watch},
24 time::Instant,
25};
26use tracing::{debug, info, trace, warn};
27
28#[cfg(test)]
29use crate::{
30 CommitConsumerArgs, TransactionClient, block::CertifiedBlocksOutput,
31 block_verifier::NoopBlockVerifier, storage::mem_store::MemStore,
32};
33use crate::{
34 ancestor::{AncestorState, AncestorStateManager},
35 block::{
36 Block, BlockAPI, BlockV1, BlockV2, ExtendedBlock, GENESIS_ROUND, SignedBlock, Slot,
37 VerifiedBlock,
38 },
39 block_manager::BlockManager,
40 commit::{
41 CertifiedCommit, CertifiedCommits, CommitAPI, CommittedSubDag, DecidedLeader, Decision,
42 },
43 commit_observer::CommitObserver,
44 context::Context,
45 dag_state::DagState,
46 error::{ConsensusError, ConsensusResult},
47 leader_schedule::LeaderSchedule,
48 round_tracker::PeerRoundTracker,
49 stake_aggregator::{QuorumThreshold, StakeAggregator},
50 transaction::TransactionConsumer,
51 transaction_certifier::TransactionCertifier,
52 universal_committer::{
53 UniversalCommitter, universal_committer_builder::UniversalCommitterBuilder,
54 },
55};
56
57const MAX_COMMIT_VOTES_PER_BLOCK: usize = 100;
60
61pub(crate) struct Core {
62 context: Arc<Context>,
63 transaction_consumer: TransactionConsumer,
65 transaction_certifier: TransactionCertifier,
67 block_manager: BlockManager,
70 subscriber_exists: bool,
74 propagation_delay: Round,
81 committer: UniversalCommitter,
83 last_signaled_round: Round,
85 last_included_ancestors: Vec<Option<BlockRef>>,
89 last_decided_leader: Slot,
94 leader_schedule: Arc<LeaderSchedule>,
97 commit_observer: CommitObserver,
100 signals: CoreSignals,
102 block_signer: ProtocolKeyPair,
104 dag_state: Arc<RwLock<DagState>>,
106 last_known_proposed_round: Option<Round>,
110 ancestor_state_manager: AncestorStateManager,
115 round_tracker: Arc<RwLock<PeerRoundTracker>>,
120}
121
122impl Core {
123 pub(crate) fn new(
124 context: Arc<Context>,
125 leader_schedule: Arc<LeaderSchedule>,
126 transaction_consumer: TransactionConsumer,
127 transaction_certifier: TransactionCertifier,
128 block_manager: BlockManager,
129 subscriber_exists: bool,
130 commit_observer: CommitObserver,
131 signals: CoreSignals,
132 block_signer: ProtocolKeyPair,
133 dag_state: Arc<RwLock<DagState>>,
134 sync_last_known_own_block: bool,
135 round_tracker: Arc<RwLock<PeerRoundTracker>>,
136 ) -> Self {
137 let last_decided_leader = dag_state.read().last_commit_leader();
138 let number_of_leaders = context
139 .protocol_config
140 .mysticeti_num_leaders_per_round()
141 .unwrap_or(1);
142 let committer = UniversalCommitterBuilder::new(
143 context.clone(),
144 leader_schedule.clone(),
145 dag_state.clone(),
146 )
147 .with_number_of_leaders(number_of_leaders)
148 .with_pipeline(true)
149 .build();
150
151 let last_proposed_block = dag_state.read().get_last_proposed_block();
152
153 let last_signaled_round = last_proposed_block.round();
154
155 let mut last_included_ancestors = vec![None; context.committee.size()];
164 for ancestor in last_proposed_block.ancestors() {
165 last_included_ancestors[ancestor.author] = Some(*ancestor);
166 }
167
168 let min_propose_round = if sync_last_known_own_block {
169 None
170 } else {
171 Some(0)
173 };
174
175 let propagation_scores = leader_schedule
176 .leader_swap_table
177 .read()
178 .reputation_scores
179 .clone();
180 let mut ancestor_state_manager =
181 AncestorStateManager::new(context.clone(), dag_state.clone());
182 ancestor_state_manager.set_propagation_scores(propagation_scores);
183
184 Self {
185 context,
186 last_signaled_round,
187 last_included_ancestors,
188 last_decided_leader,
189 leader_schedule,
190 transaction_consumer,
191 transaction_certifier,
192 block_manager,
193 subscriber_exists,
194 propagation_delay: 0,
195 committer,
196 commit_observer,
197 signals,
198 block_signer,
199 dag_state,
200 last_known_proposed_round: min_propose_round,
201 ancestor_state_manager,
202 round_tracker,
203 }
204 .recover()
205 }
206
207 fn recover(mut self) -> Self {
208 let _s = self
209 .context
210 .metrics
211 .node_metrics
212 .scope_processing_time
213 .with_label_values(&["Core::recover"])
214 .start_timer();
215
216 self.try_commit(vec![]).unwrap();
218
219 let last_proposed_block = if let Some(last_proposed_block) = self.try_propose(true).unwrap()
220 {
221 last_proposed_block
222 } else {
223 let last_proposed_block = self.dag_state.read().get_last_proposed_block();
224
225 if self.should_propose() {
226 assert!(
227 last_proposed_block.round() > GENESIS_ROUND,
228 "At minimum a block of round higher than genesis should have been produced during recovery"
229 );
230 }
231
232 self.signals
234 .new_block(ExtendedBlock {
235 block: last_proposed_block.clone(),
236 excluded_ancestors: vec![],
237 })
238 .unwrap();
239 last_proposed_block
240 };
241
242 self.try_signal_new_round();
246
247 info!(
248 "Core recovery completed with last proposed block {:?}",
249 last_proposed_block
250 );
251
252 self
253 }
254
255 #[tracing::instrument(skip_all)]
259 pub(crate) fn add_blocks(
260 &mut self,
261 blocks: Vec<VerifiedBlock>,
262 ) -> ConsensusResult<BTreeSet<BlockRef>> {
263 let _scope = monitored_scope("Core::add_blocks");
264 let _s = self
265 .context
266 .metrics
267 .node_metrics
268 .scope_processing_time
269 .with_label_values(&["Core::add_blocks"])
270 .start_timer();
271 self.context
272 .metrics
273 .node_metrics
274 .core_add_blocks_batch_size
275 .observe(blocks.len() as f64);
276
277 let (accepted_blocks, missing_block_refs) = self.block_manager.try_accept_blocks(blocks);
278
279 if !accepted_blocks.is_empty() {
280 trace!(
281 "Accepted blocks: {}",
282 accepted_blocks
283 .iter()
284 .map(|b| b.reference().to_string())
285 .join(",")
286 );
287
288 self.try_commit(vec![])?;
290
291 self.try_propose(false)?;
293
294 self.try_signal_new_round();
298 };
299
300 if !missing_block_refs.is_empty() {
301 trace!(
302 "Missing block refs: {}",
303 missing_block_refs.iter().map(|b| b.to_string()).join(", ")
304 );
305 }
306
307 Ok(missing_block_refs)
308 }
309
310 #[tracing::instrument(skip_all)]
315 pub(crate) fn add_certified_commits(
316 &mut self,
317 certified_commits: CertifiedCommits,
318 ) -> ConsensusResult<BTreeSet<BlockRef>> {
319 let _scope = monitored_scope("Core::add_certified_commits");
320
321 let votes = certified_commits.votes().to_vec();
322 let commits = self
323 .filter_new_commits(certified_commits.commits().to_vec())
324 .expect("Certified commits validation failed");
325
326 let (_, missing_block_refs) = self.block_manager.try_accept_blocks(votes);
330
331 self.try_commit(commits)?;
333
334 self.try_propose(false)?;
336
337 self.try_signal_new_round();
341
342 Ok(missing_block_refs)
343 }
344
345 pub(crate) fn check_block_refs(
348 &mut self,
349 block_refs: Vec<BlockRef>,
350 ) -> ConsensusResult<BTreeSet<BlockRef>> {
351 let _scope = monitored_scope("Core::check_block_refs");
352 let _s = self
353 .context
354 .metrics
355 .node_metrics
356 .scope_processing_time
357 .with_label_values(&["Core::check_block_refs"])
358 .start_timer();
359 self.context
360 .metrics
361 .node_metrics
362 .core_check_block_refs_batch_size
363 .observe(block_refs.len() as f64);
364
365 let missing_block_refs = self.block_manager.try_find_blocks(block_refs);
367
368 if !missing_block_refs.is_empty() {
369 trace!(
370 "Missing block refs: {}",
371 missing_block_refs.iter().map(|b| b.to_string()).join(", ")
372 );
373 }
374 Ok(missing_block_refs)
375 }
376
377 fn try_signal_new_round(&mut self) {
379 let new_clock_round = self.dag_state.read().threshold_clock_round();
384 if new_clock_round <= self.last_signaled_round {
385 return;
386 }
387 self.signals.new_round(new_clock_round);
389 self.last_signaled_round = new_clock_round;
390
391 self.context
393 .metrics
394 .node_metrics
395 .threshold_clock_round
396 .set(new_clock_round as i64);
397 }
398
399 pub(crate) fn new_block(
403 &mut self,
404 round: Round,
405 force: bool,
406 ) -> ConsensusResult<Option<VerifiedBlock>> {
407 let _scope = monitored_scope("Core::new_block");
408 if self.last_proposed_round() < round {
409 self.context
410 .metrics
411 .node_metrics
412 .leader_timeout_total
413 .with_label_values(&[&format!("{force}")])
414 .inc();
415 let result = self.try_propose(force);
416 self.try_signal_new_round();
418 return result;
419 }
420 Ok(None)
421 }
422
423 fn filter_new_commits(
426 &mut self,
427 commits: Vec<CertifiedCommit>,
428 ) -> ConsensusResult<Vec<CertifiedCommit>> {
429 let last_commit_index = self.dag_state.read().last_commit_index();
431 let commits = commits
432 .iter()
433 .filter(|commit| {
434 if commit.index() > last_commit_index {
435 true
436 } else {
437 tracing::debug!(
438 "Skip commit for index {} as it is already committed with last commit index {}",
439 commit.index(),
440 last_commit_index
441 );
442 false
443 }
444 })
445 .cloned()
446 .collect::<Vec<_>>();
447
448 if let Some(commit) = commits.first()
450 && commit.index() != last_commit_index + 1
451 {
452 return Err(ConsensusError::UnexpectedCertifiedCommitIndex {
453 expected_commit_index: last_commit_index + 1,
454 commit_index: commit.index(),
455 });
456 }
457
458 Ok(commits)
459 }
460
461 fn try_propose(&mut self, force: bool) -> ConsensusResult<Option<VerifiedBlock>> {
465 if !self.should_propose() {
466 return Ok(None);
467 }
468 if let Some(extended_block) = self.try_new_block(force) {
469 self.signals.new_block(extended_block.clone())?;
470
471 fail_point!("consensus-after-propose");
472
473 self.try_commit(vec![])?;
475 return Ok(Some(extended_block.block));
476 }
477 Ok(None)
478 }
479
480 fn try_new_block(&mut self, force: bool) -> Option<ExtendedBlock> {
483 let _s = self
484 .context
485 .metrics
486 .node_metrics
487 .scope_processing_time
488 .with_label_values(&["Core::try_new_block"])
489 .start_timer();
490
491 let clock_round = {
493 let dag_state = self.dag_state.read();
494 let clock_round = dag_state.threshold_clock_round();
495 if clock_round <= dag_state.get_last_proposed_block().round() {
496 debug!(
497 "Skipping block proposal for round {} as it is not higher than the last proposed block {}",
498 clock_round,
499 dag_state.get_last_proposed_block().round()
500 );
501 return None;
502 }
503 clock_round
504 };
505
506 let quorum_round = clock_round.saturating_sub(1);
508
509 if !force {
512 if !self.leaders_exist(quorum_round) {
513 return None;
514 }
515
516 if Duration::from_millis(
517 self.context
518 .clock
519 .timestamp_utc_ms()
520 .saturating_sub(self.last_proposed_timestamp_ms()),
521 ) < self.context.parameters.min_round_delay
522 {
523 debug!(
524 "Skipping block proposal for round {} as it is too soon after the last proposed block timestamp {}; min round delay is {}ms",
525 clock_round,
526 self.last_proposed_timestamp_ms(),
527 self.context.parameters.min_round_delay.as_millis(),
528 );
529 return None;
530 }
531 }
532
533 let (ancestors, excluded_and_equivocating_ancestors) =
535 self.smart_ancestors_to_propose(clock_round, !force);
536
537 if ancestors.is_empty() {
539 assert!(
540 !force,
541 "Ancestors should have been returned if force is true!"
542 );
543 debug!(
544 "Skipping block proposal for round {} because no good ancestor is found",
545 clock_round,
546 );
547 return None;
548 }
549
550 let excluded_ancestors_limit = self.context.committee.size() * 2;
551 if excluded_and_equivocating_ancestors.len() > excluded_ancestors_limit {
552 debug!(
553 "Dropping {} excluded ancestor(s) during proposal due to size limit",
554 excluded_and_equivocating_ancestors.len() - excluded_ancestors_limit,
555 );
556 }
557 let excluded_ancestors = excluded_and_equivocating_ancestors
558 .into_iter()
559 .take(excluded_ancestors_limit)
560 .collect();
561
562 for ancestor in &ancestors {
564 self.last_included_ancestors[ancestor.author()] = Some(ancestor.reference());
565 }
566
567 let leader_authority = &self
568 .context
569 .committee
570 .authority(self.first_leader(quorum_round))
571 .hostname;
572 self.context
573 .metrics
574 .node_metrics
575 .block_proposal_leader_wait_ms
576 .with_label_values(&[leader_authority])
577 .inc_by(
578 Instant::now()
579 .saturating_duration_since(self.dag_state.read().threshold_clock_quorum_ts())
580 .as_millis() as u64,
581 );
582 self.context
583 .metrics
584 .node_metrics
585 .block_proposal_leader_wait_count
586 .with_label_values(&[leader_authority])
587 .inc();
588
589 self.context
590 .metrics
591 .node_metrics
592 .proposed_block_ancestors
593 .observe(ancestors.len() as f64);
594 for ancestor in &ancestors {
595 let authority = &self.context.committee.authority(ancestor.author()).hostname;
596 self.context
597 .metrics
598 .node_metrics
599 .proposed_block_ancestors_depth
600 .with_label_values(&[authority])
601 .observe(clock_round.saturating_sub(ancestor.round()).into());
602 }
603
604 let now = self.context.clock.timestamp_utc_ms();
605 ancestors.iter().for_each(|block| {
606 if block.timestamp_ms() > now {
607 trace!("Ancestor block {:?} has timestamp {}, greater than current timestamp {now}. Proposing for round {}.", block, block.timestamp_ms(), clock_round);
608 let authority = &self.context.committee.authority(block.author()).hostname;
609 self.context
610 .metrics
611 .node_metrics
612 .proposed_block_ancestors_timestamp_drift_ms
613 .with_label_values(&[authority])
614 .inc_by(block.timestamp_ms().saturating_sub(now));
615 }
616 });
617
618 let (transactions, ack_transactions, _limit_reached) = self.transaction_consumer.next();
621 self.context
622 .metrics
623 .node_metrics
624 .proposed_block_transactions
625 .observe(transactions.len() as f64);
626
627 let commit_votes = self
629 .dag_state
630 .write()
631 .take_commit_votes(MAX_COMMIT_VOTES_PER_BLOCK);
632
633 let transaction_votes = if self.context.protocol_config.mysticeti_fastpath() {
634 let hard_linked_ancestors = {
635 let mut dag_state = self.dag_state.write();
636 ancestors
637 .iter()
638 .flat_map(|ancestor| dag_state.link_causal_history(ancestor.reference()))
639 .collect()
640 };
641 self.transaction_certifier
642 .get_own_votes(hard_linked_ancestors)
643 } else {
644 vec![]
645 };
646
647 let block = if self.context.protocol_config.mysticeti_fastpath() {
649 Block::V2(BlockV2::new(
650 self.context.committee.epoch(),
651 clock_round,
652 self.context.own_index,
653 now,
654 ancestors.iter().map(|b| b.reference()).collect(),
655 transactions,
656 commit_votes,
657 transaction_votes,
658 vec![],
659 ))
660 } else {
661 Block::V1(BlockV1::new(
662 self.context.committee.epoch(),
663 clock_round,
664 self.context.own_index,
665 now,
666 ancestors.iter().map(|b| b.reference()).collect(),
667 transactions,
668 commit_votes,
669 vec![],
670 ))
671 };
672 let signed_block =
673 SignedBlock::new(block, &self.block_signer).expect("Block signing failed.");
674 let serialized = signed_block
675 .serialize()
676 .expect("Block serialization failed.");
677 self.context
678 .metrics
679 .node_metrics
680 .proposed_block_size
681 .observe(serialized.len() as f64);
682 let verified_block = VerifiedBlock::new_verified(signed_block, serialized);
684
685 let last_proposed_block = self.last_proposed_block();
687 if last_proposed_block.round() > 0 {
688 self.context
689 .metrics
690 .node_metrics
691 .block_proposal_interval
692 .observe(
693 Duration::from_millis(
694 verified_block
695 .timestamp_ms()
696 .saturating_sub(last_proposed_block.timestamp_ms()),
697 )
698 .as_secs_f64(),
699 );
700 }
701
702 let (accepted_blocks, missing) = self
704 .block_manager
705 .try_accept_blocks(vec![verified_block.clone()]);
706 assert_eq!(accepted_blocks.len(), 1);
707 assert!(missing.is_empty());
708
709 if self.context.protocol_config.mysticeti_fastpath() {
713 self.transaction_certifier
714 .add_voted_blocks(vec![(verified_block.clone(), vec![])]);
715 self.dag_state
716 .write()
717 .link_causal_history(verified_block.reference());
718 }
719
720 self.dag_state.write().flush();
722
723 ack_transactions(verified_block.reference());
725
726 info!("Created block {verified_block:?} for round {clock_round}");
727
728 self.context
729 .metrics
730 .node_metrics
731 .proposed_blocks
732 .with_label_values(&[&force.to_string()])
733 .inc();
734
735 let extended_block = ExtendedBlock {
736 block: verified_block,
737 excluded_ancestors,
738 };
739
740 self.round_tracker
742 .write()
743 .update_from_accepted_block(&extended_block);
744
745 Some(extended_block)
746 }
747
748 fn try_commit(
751 &mut self,
752 mut certified_commits: Vec<CertifiedCommit>,
753 ) -> ConsensusResult<Vec<CommittedSubDag>> {
754 let _s = self
755 .context
756 .metrics
757 .node_metrics
758 .scope_processing_time
759 .with_label_values(&["Core::try_commit"])
760 .start_timer();
761
762 let mut certified_commits_map = BTreeMap::new();
763 for c in &certified_commits {
764 certified_commits_map.insert(c.index(), c.reference());
765 }
766
767 if !certified_commits.is_empty() {
768 info!(
769 "Processing synced commits: {:?}",
770 certified_commits
771 .iter()
772 .map(|c| (c.index(), c.leader()))
773 .collect::<Vec<_>>()
774 );
775 }
776
777 let mut committed_sub_dags = Vec::new();
778 loop {
780 let mut commits_until_update = self
785 .leader_schedule
786 .commits_until_leader_schedule_update(self.dag_state.clone());
787
788 if commits_until_update == 0 {
789 let last_commit_index = self.dag_state.read().last_commit_index();
790
791 tracing::info!(
792 "Leader schedule change triggered at commit index {last_commit_index}"
793 );
794
795 self.leader_schedule
796 .update_leader_schedule_v2(&self.dag_state);
797
798 let propagation_scores = self
799 .leader_schedule
800 .leader_swap_table
801 .read()
802 .reputation_scores
803 .clone();
804 self.ancestor_state_manager
805 .set_propagation_scores(propagation_scores);
806
807 commits_until_update = self
808 .leader_schedule
809 .commits_until_leader_schedule_update(self.dag_state.clone());
810
811 fail_point!("consensus-after-leader-schedule-change");
812 }
813 assert!(commits_until_update > 0);
814
815 let (certified_leaders, decided_certified_commits): (
818 Vec<DecidedLeader>,
819 Vec<CertifiedCommit>,
820 ) = self
821 .try_select_certified_leaders(&mut certified_commits, commits_until_update)
822 .into_iter()
823 .unzip();
824
825 let blocks = decided_certified_commits
832 .iter()
833 .flat_map(|c| c.blocks())
834 .cloned()
835 .collect::<Vec<_>>();
836 self.block_manager.try_accept_committed_blocks(blocks);
837
838 let (decided_leaders, local) = if certified_leaders.is_empty() {
840 let mut decided_leaders = self.committer.try_decide(self.last_decided_leader);
842 if decided_leaders.len() >= commits_until_update {
844 let _ = decided_leaders.split_off(commits_until_update);
845 }
846 (decided_leaders, true)
847 } else {
848 (certified_leaders, false)
849 };
850
851 let Some(last_decided) = decided_leaders.last().cloned() else {
853 break;
854 };
855
856 self.last_decided_leader = last_decided.slot();
857 self.context
858 .metrics
859 .node_metrics
860 .last_decided_leader_round
861 .set(self.last_decided_leader.round as i64);
862
863 let sequenced_leaders = decided_leaders
864 .into_iter()
865 .filter_map(|leader| leader.into_committed_block())
866 .collect::<Vec<_>>();
867 if sequenced_leaders.is_empty() {
870 break;
871 }
872 tracing::info!(
873 "Committing {} leaders: {}; {} commits before next leader schedule change",
874 sequenced_leaders.len(),
875 sequenced_leaders
876 .iter()
877 .map(|b| b.reference().to_string())
878 .join(","),
879 commits_until_update,
880 );
881
882 let subdags = self
884 .commit_observer
885 .handle_commit(sequenced_leaders, local)?;
886
887 self.block_manager
889 .try_unsuspend_blocks_for_latest_gc_round();
890
891 committed_sub_dags.extend(subdags);
892
893 fail_point!("consensus-after-handle-commit");
894 }
895
896 for sub_dag in &committed_sub_dags {
898 if let Some(commit_ref) = certified_commits_map.remove(&sub_dag.commit_ref.index) {
899 assert_eq!(
900 commit_ref, sub_dag.commit_ref,
901 "Certified commit has different reference than the committed sub dag"
902 );
903 }
904 }
905
906 let committed_block_refs = committed_sub_dags
908 .iter()
909 .flat_map(|sub_dag| sub_dag.blocks.iter())
910 .filter_map(|block| {
911 (block.author() == self.context.own_index).then_some(block.reference())
912 })
913 .collect::<Vec<_>>();
914 self.transaction_consumer
915 .notify_own_blocks_status(committed_block_refs, self.dag_state.read().gc_round());
916
917 Ok(committed_sub_dags)
918 }
919
920 pub(crate) fn get_missing_blocks(&self) -> BTreeSet<BlockRef> {
921 let _scope = monitored_scope("Core::get_missing_blocks");
922 self.block_manager.missing_blocks()
923 }
924
925 pub(crate) fn set_subscriber_exists(&mut self, exists: bool) {
927 info!("Block subscriber exists: {exists}");
928 self.subscriber_exists = exists;
929 }
930
931 pub(crate) fn set_propagation_delay(&mut self, delay: Round) {
933 info!("Propagation round delay set to: {delay}");
934 self.propagation_delay = delay;
935 }
936
937 pub(crate) fn set_last_known_proposed_round(&mut self, round: Round) {
941 if self.last_known_proposed_round.is_some() {
942 panic!(
943 "Should not attempt to set the last known proposed round if that has been already set"
944 );
945 }
946 self.last_known_proposed_round = Some(round);
947 info!("Last known proposed round set to {round}");
948 }
949
950 pub(crate) fn should_propose(&self) -> bool {
952 let clock_round = self.dag_state.read().threshold_clock_round();
953 let core_skipped_proposals = &self.context.metrics.node_metrics.core_skipped_proposals;
954
955 if !self.subscriber_exists {
956 debug!("Skip proposing for round {clock_round}, no subscriber exists.");
957 core_skipped_proposals
958 .with_label_values(&["no_subscriber"])
959 .inc();
960 return false;
961 }
962
963 if self.propagation_delay
964 > self
965 .context
966 .parameters
967 .propagation_delay_stop_proposal_threshold
968 {
969 debug!(
970 "Skip proposing for round {clock_round}, high propagation delay {} > {}.",
971 self.propagation_delay,
972 self.context
973 .parameters
974 .propagation_delay_stop_proposal_threshold
975 );
976 core_skipped_proposals
977 .with_label_values(&["high_propagation_delay"])
978 .inc();
979 return false;
980 }
981
982 let Some(last_known_proposed_round) = self.last_known_proposed_round else {
983 debug!(
984 "Skip proposing for round {clock_round}, last known proposed round has not been synced yet."
985 );
986 core_skipped_proposals
987 .with_label_values(&["no_last_known_proposed_round"])
988 .inc();
989 return false;
990 };
991 if clock_round <= last_known_proposed_round {
992 debug!(
993 "Skip proposing for round {clock_round} as last known proposed round is {last_known_proposed_round}"
994 );
995 core_skipped_proposals
996 .with_label_values(&["higher_last_known_proposed_round"])
997 .inc();
998 return false;
999 }
1000
1001 true
1002 }
1003
1004 #[tracing::instrument(skip_all)]
1010 fn try_select_certified_leaders(
1011 &mut self,
1012 certified_commits: &mut Vec<CertifiedCommit>,
1013 limit: usize,
1014 ) -> Vec<(DecidedLeader, CertifiedCommit)> {
1015 assert!(limit > 0, "limit should be greater than 0");
1016 if certified_commits.is_empty() {
1017 return vec![];
1018 }
1019
1020 let to_commit = if certified_commits.len() >= limit {
1021 certified_commits.drain(..limit).collect::<Vec<_>>()
1023 } else {
1024 std::mem::take(certified_commits)
1026 };
1027
1028 tracing::debug!(
1029 "Selected {} certified leaders: {}",
1030 to_commit.len(),
1031 to_commit.iter().map(|c| c.leader().to_string()).join(",")
1032 );
1033
1034 to_commit
1035 .into_iter()
1036 .map(|commit| {
1037 let leader = commit.blocks().last().expect("Certified commit should have at least one block");
1038 assert_eq!(leader.reference(), commit.leader(), "Last block of the committed sub dag should have the same digest as the leader of the commit");
1039 let leader = DecidedLeader::Commit(leader.clone(), false);
1041 UniversalCommitter::update_metrics(&self.context, &leader, Decision::Certified);
1042 (leader, commit)
1043 })
1044 .collect::<Vec<_>>()
1045 }
1046
1047 fn smart_ancestors_to_propose(
1051 &mut self,
1052 clock_round: Round,
1053 smart_select: bool,
1054 ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
1055 let node_metrics = &self.context.metrics.node_metrics;
1056 let _s = node_metrics
1057 .scope_processing_time
1058 .with_label_values(&["Core::smart_ancestors_to_propose"])
1059 .start_timer();
1060
1061 let all_ancestors = self
1063 .dag_state
1064 .read()
1065 .get_last_cached_block_per_authority(clock_round);
1066
1067 assert_eq!(
1068 all_ancestors.len(),
1069 self.context.committee.size(),
1070 "Fatal error, number of returned ancestors don't match committee size."
1071 );
1072
1073 let accepted_quorum_rounds = self.round_tracker.read().compute_accepted_quorum_rounds();
1075
1076 self.ancestor_state_manager
1077 .update_all_ancestors_state(&accepted_quorum_rounds);
1078
1079 let ancestor_state_map = self.ancestor_state_manager.get_ancestor_states();
1080
1081 let quorum_round = clock_round.saturating_sub(1);
1082
1083 let mut score_and_pending_excluded_ancestors = Vec::new();
1084 let mut excluded_and_equivocating_ancestors = BTreeSet::new();
1085
1086 let included_ancestors = iter::once(self.last_proposed_block().clone())
1091 .chain(
1092 all_ancestors
1093 .into_iter()
1094 .flat_map(|(ancestor, equivocating_ancestors)| {
1095 if ancestor.author() == self.context.own_index {
1096 return None;
1097 }
1098 if let Some(last_block_ref) =
1099 self.last_included_ancestors[ancestor.author()]
1100 && last_block_ref.round >= ancestor.round() {
1101 return None;
1102 }
1103
1104 excluded_and_equivocating_ancestors.extend(equivocating_ancestors);
1106
1107 let ancestor_state = ancestor_state_map[ancestor.author()];
1108 match ancestor_state {
1109 AncestorState::Include => {
1110 trace!("Found ancestor {ancestor} with INCLUDE state for round {clock_round}");
1111 }
1112 AncestorState::Exclude(score) => {
1113 trace!("Added ancestor {ancestor} with EXCLUDE state with score {score} to temporary excluded ancestors for round {clock_round}");
1114 score_and_pending_excluded_ancestors.push((score, ancestor));
1115 return None;
1116 }
1117 }
1118
1119 Some(ancestor)
1120 }),
1121 )
1122 .collect::<Vec<_>>();
1123
1124 let mut parent_round_quorum = StakeAggregator::<QuorumThreshold>::new();
1125
1126 for ancestor in included_ancestors
1128 .iter()
1129 .filter(|a| a.round() == quorum_round)
1130 {
1131 parent_round_quorum.add(ancestor.author(), &self.context.committee);
1132 }
1133
1134 if smart_select && !parent_round_quorum.reached_threshold(&self.context.committee) {
1135 node_metrics.smart_selection_wait.inc();
1136 debug!(
1137 "Only found {} stake of good ancestors to include for round {clock_round}, will wait for more.",
1138 parent_round_quorum.stake()
1139 );
1140 return (vec![], BTreeSet::new());
1141 }
1142
1143 score_and_pending_excluded_ancestors.sort_by(|a, b| b.0.cmp(&a.0));
1146
1147 let mut ancestors_to_propose = included_ancestors;
1148 let mut excluded_ancestors = Vec::new();
1149 for (score, ancestor) in score_and_pending_excluded_ancestors.into_iter() {
1150 let block_hostname = &self.context.committee.authority(ancestor.author()).hostname;
1151 if !parent_round_quorum.reached_threshold(&self.context.committee)
1152 && ancestor.round() == quorum_round
1153 {
1154 debug!(
1155 "Including temporarily excluded parent round ancestor {ancestor} with score {score} to propose for round {clock_round}"
1156 );
1157 parent_round_quorum.add(ancestor.author(), &self.context.committee);
1158 ancestors_to_propose.push(ancestor);
1159 node_metrics
1160 .included_excluded_proposal_ancestors_count_by_authority
1161 .with_label_values(&[block_hostname, "timeout"])
1162 .inc();
1163 } else {
1164 excluded_ancestors.push((score, ancestor));
1165 }
1166 }
1167
1168 for (score, ancestor) in excluded_ancestors.iter() {
1173 let excluded_author = ancestor.author();
1174 let block_hostname = &self.context.committee.authority(excluded_author).hostname;
1175 let mut accepted_low_quorum_round = accepted_quorum_rounds[excluded_author].0;
1177 accepted_low_quorum_round = accepted_low_quorum_round.min(quorum_round);
1181
1182 let last_included_round = self.last_included_ancestors[excluded_author]
1183 .map(|block_ref| block_ref.round)
1184 .unwrap_or(GENESIS_ROUND);
1185 if ancestor.round() <= last_included_round {
1186 continue;
1189 }
1190
1191 if last_included_round >= accepted_low_quorum_round {
1192 excluded_and_equivocating_ancestors.insert(ancestor.reference());
1193 trace!(
1194 "Excluded low score ancestor {} with score {score} to propose for round {clock_round}: last included round {last_included_round} >= accepted low quorum round {accepted_low_quorum_round}",
1195 ancestor.reference()
1196 );
1197 node_metrics
1198 .excluded_proposal_ancestors_count_by_authority
1199 .with_label_values(&[block_hostname])
1200 .inc();
1201 continue;
1202 }
1203
1204 let ancestor = if ancestor.round() <= accepted_low_quorum_round {
1205 ancestor.clone()
1207 } else {
1208 excluded_and_equivocating_ancestors.insert(ancestor.reference());
1210 trace!(
1211 "Excluded low score ancestor {} with score {score} to propose for round {clock_round}: ancestor round {} > accepted low quorum round {accepted_low_quorum_round} ",
1212 ancestor.reference(),
1213 ancestor.round()
1214 );
1215 node_metrics
1216 .excluded_proposal_ancestors_count_by_authority
1217 .with_label_values(&[block_hostname])
1218 .inc();
1219
1220 match self.dag_state.read().get_last_cached_block_in_range(
1226 excluded_author,
1227 last_included_round + 1,
1228 accepted_low_quorum_round + 1,
1229 ) {
1230 Some(earlier_ancestor) => {
1231 earlier_ancestor
1233 }
1234 None => {
1235 continue;
1237 }
1238 }
1239 };
1240 self.last_included_ancestors[excluded_author] = Some(ancestor.reference());
1241 ancestors_to_propose.push(ancestor.clone());
1242 trace!(
1243 "Included low scoring ancestor {} with score {score} seen at accepted low quorum round {accepted_low_quorum_round} to propose for round {clock_round}",
1244 ancestor.reference()
1245 );
1246 node_metrics
1247 .included_excluded_proposal_ancestors_count_by_authority
1248 .with_label_values(&[block_hostname, "quorum"])
1249 .inc();
1250 }
1251
1252 assert!(
1253 parent_round_quorum.reached_threshold(&self.context.committee),
1254 "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core."
1255 );
1256
1257 debug!(
1258 "Included {} ancestors & excluded {} low performing or equivocating ancestors for proposal in round {clock_round}",
1259 ancestors_to_propose.len(),
1260 excluded_and_equivocating_ancestors.len()
1261 );
1262
1263 (ancestors_to_propose, excluded_and_equivocating_ancestors)
1264 }
1265
1266 fn leaders_exist(&self, round: Round) -> bool {
1270 let dag_state = self.dag_state.read();
1271 for leader in self.leaders(round) {
1272 if !dag_state.contains_cached_block_at_slot(leader) {
1276 return false;
1277 }
1278 }
1279
1280 true
1281 }
1282
1283 fn leaders(&self, round: Round) -> Vec<Slot> {
1285 self.committer
1286 .get_leaders(round)
1287 .into_iter()
1288 .map(|authority_index| Slot::new(round, authority_index))
1289 .collect()
1290 }
1291
1292 fn first_leader(&self, round: Round) -> AuthorityIndex {
1294 self.leaders(round).first().unwrap().authority
1295 }
1296
1297 fn last_proposed_timestamp_ms(&self) -> BlockTimestampMs {
1298 self.last_proposed_block().timestamp_ms()
1299 }
1300
1301 fn last_proposed_round(&self) -> Round {
1302 self.last_proposed_block().round()
1303 }
1304
1305 fn last_proposed_block(&self) -> VerifiedBlock {
1306 self.dag_state.read().get_last_proposed_block()
1307 }
1308}
1309
1310pub(crate) struct CoreSignals {
1312 tx_block_broadcast: broadcast::Sender<ExtendedBlock>,
1313 new_round_sender: watch::Sender<Round>,
1314 context: Arc<Context>,
1315}
1316
1317impl CoreSignals {
1318 pub fn new(context: Arc<Context>) -> (Self, CoreSignalsReceivers) {
1319 let (tx_block_broadcast, rx_block_broadcast) = broadcast::channel::<ExtendedBlock>(
1323 context.parameters.dag_state_cached_rounds as usize,
1324 );
1325 let (new_round_sender, new_round_receiver) = watch::channel(0);
1326
1327 let me = Self {
1328 tx_block_broadcast,
1329 new_round_sender,
1330 context,
1331 };
1332
1333 let receivers = CoreSignalsReceivers {
1334 rx_block_broadcast,
1335 new_round_receiver,
1336 };
1337
1338 (me, receivers)
1339 }
1340
1341 pub(crate) fn new_block(&self, extended_block: ExtendedBlock) -> ConsensusResult<()> {
1344 if self.context.committee.size() > 1 {
1347 if extended_block.block.round() == GENESIS_ROUND {
1348 debug!("Ignoring broadcasting genesis block to peers");
1349 return Ok(());
1350 }
1351
1352 if let Err(err) = self.tx_block_broadcast.send(extended_block) {
1353 warn!("Couldn't broadcast the block to any receiver: {err}");
1354 return Err(ConsensusError::Shutdown);
1355 }
1356 } else {
1357 debug!(
1358 "Did not broadcast block {extended_block:?} to receivers as committee size is <= 1"
1359 );
1360 }
1361 Ok(())
1362 }
1363
1364 pub(crate) fn new_round(&mut self, round_number: Round) {
1367 let _ = self.new_round_sender.send_replace(round_number);
1368 }
1369}
1370
1371pub(crate) struct CoreSignalsReceivers {
1374 rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
1375 new_round_receiver: watch::Receiver<Round>,
1376}
1377
1378impl CoreSignalsReceivers {
1379 pub(crate) fn block_broadcast_receiver(&self) -> broadcast::Receiver<ExtendedBlock> {
1380 self.rx_block_broadcast.resubscribe()
1381 }
1382
1383 pub(crate) fn new_round_receiver(&self) -> watch::Receiver<Round> {
1384 self.new_round_receiver.clone()
1385 }
1386}
1387
1388#[cfg(test)]
1391pub(crate) async fn create_cores(
1392 context: Context,
1393 authorities: Vec<Stake>,
1394) -> Vec<CoreTextFixture> {
1395 let mut cores = Vec::new();
1396
1397 for index in 0..authorities.len() {
1398 let own_index = AuthorityIndex::new_for_test(index as u32);
1399 let core =
1400 CoreTextFixture::new(context.clone(), authorities.clone(), own_index, false).await;
1401 cores.push(core);
1402 }
1403 cores
1404}
1405
1406#[cfg(test)]
1407pub(crate) struct CoreTextFixture {
1408 pub(crate) core: Core,
1409 pub(crate) transaction_certifier: TransactionCertifier,
1410 pub(crate) signal_receivers: CoreSignalsReceivers,
1411 pub(crate) block_receiver: broadcast::Receiver<ExtendedBlock>,
1412 pub(crate) _commit_output_receiver: UnboundedReceiver<CommittedSubDag>,
1413 pub(crate) _blocks_output_receiver: UnboundedReceiver<CertifiedBlocksOutput>,
1414 pub(crate) dag_state: Arc<RwLock<DagState>>,
1415 pub(crate) store: Arc<MemStore>,
1416}
1417
1418#[cfg(test)]
1419impl CoreTextFixture {
1420 async fn new(
1421 context: Context,
1422 authorities: Vec<Stake>,
1423 own_index: AuthorityIndex,
1424 sync_last_known_own_block: bool,
1425 ) -> Self {
1426 let (committee, mut signers) = local_committee_and_keys(0, authorities.clone());
1427 let mut context = context.clone();
1428 context = context
1429 .with_committee(committee)
1430 .with_authority_index(own_index);
1431 context
1432 .protocol_config
1433 .set_consensus_bad_nodes_stake_threshold_for_testing(33);
1434
1435 let context = Arc::new(context);
1436 let store = Arc::new(MemStore::new());
1437 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1438
1439 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
1440 let leader_schedule = Arc::new(
1441 LeaderSchedule::from_store(context.clone(), dag_state.clone())
1442 .with_num_commits_per_schedule(10),
1443 );
1444 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1445 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1446 let (blocks_sender, _blocks_receiver) =
1447 mysten_metrics::monitored_mpsc::unbounded_channel("consensus_block_output");
1448 let transaction_certifier =
1449 TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
1450 transaction_certifier.recover(&NoopBlockVerifier, 0);
1451 let (signals, signal_receivers) = CoreSignals::new(context.clone());
1452 let block_receiver = signal_receivers.block_broadcast_receiver();
1454
1455 let (commit_consumer, commit_output_receiver, blocks_output_receiver) =
1456 CommitConsumerArgs::new(0, 0);
1457 let commit_observer = CommitObserver::new(
1458 context.clone(),
1459 commit_consumer,
1460 dag_state.clone(),
1461 transaction_certifier.clone(),
1462 leader_schedule.clone(),
1463 )
1464 .await;
1465
1466 let block_signer = signers.remove(own_index.value()).1;
1467
1468 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
1469 let core = Core::new(
1470 context,
1471 leader_schedule,
1472 transaction_consumer,
1473 transaction_certifier.clone(),
1474 block_manager,
1475 true,
1476 commit_observer,
1477 signals,
1478 block_signer,
1479 dag_state.clone(),
1480 sync_last_known_own_block,
1481 round_tracker,
1482 );
1483
1484 Self {
1485 core,
1486 transaction_certifier,
1487 signal_receivers,
1488 block_receiver,
1489 _commit_output_receiver: commit_output_receiver,
1490 _blocks_output_receiver: blocks_output_receiver,
1491 dag_state,
1492 store,
1493 }
1494 }
1495
1496 pub(crate) fn add_blocks(
1497 &mut self,
1498 blocks: Vec<VerifiedBlock>,
1499 ) -> ConsensusResult<BTreeSet<BlockRef>> {
1500 self.transaction_certifier
1501 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
1502 self.core.add_blocks(blocks)
1503 }
1504}
1505
1506#[cfg(test)]
1507mod test {
1508 use std::{collections::BTreeSet, time::Duration};
1509
1510 use consensus_config::{AuthorityIndex, Parameters};
1511 use consensus_types::block::TransactionIndex;
1512 use futures::{StreamExt, stream::FuturesUnordered};
1513 use mysten_metrics::monitored_mpsc;
1514 use sui_protocol_config::ProtocolConfig;
1515 use tokio::time::sleep;
1516
1517 use super::*;
1518 use crate::{
1519 CommitConsumerArgs, CommitIndex,
1520 block::{TestBlock, genesis_blocks},
1521 block_verifier::NoopBlockVerifier,
1522 commit::CommitAPI,
1523 leader_scoring::ReputationScores,
1524 storage::{Store, WriteBatch, mem_store::MemStore},
1525 test_dag_builder::DagBuilder,
1526 test_dag_parser::parse_dag,
1527 transaction::{BlockStatus, TransactionClient},
1528 };
1529
1530 #[tokio::test]
1532 async fn test_core_recover_from_store_for_full_round() {
1533 telemetry_subscribers::init_for_testing();
1534 let (context, mut key_pairs) = Context::new_for_test(4);
1535 let context = Arc::new(context);
1536 let store = Arc::new(MemStore::new());
1537 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1538 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1539 let mut block_status_subscriptions = FuturesUnordered::new();
1540
1541 let mut last_round_blocks = genesis_blocks(&context);
1543 let mut all_blocks: Vec<VerifiedBlock> = last_round_blocks.clone();
1544 for round in 1..=4 {
1545 let mut this_round_blocks = Vec::new();
1546 for (index, _authority) in context.committee.authorities() {
1547 let block = VerifiedBlock::new_for_test(
1548 TestBlock::new(round, index.value() as u32)
1549 .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
1550 .build(),
1551 );
1552
1553 if round == 1 && index == context.own_index {
1555 let subscription =
1556 transaction_consumer.subscribe_for_block_status_testing(block.reference());
1557 block_status_subscriptions.push(subscription);
1558 }
1559
1560 this_round_blocks.push(block);
1561 }
1562 all_blocks.extend(this_round_blocks.clone());
1563 last_round_blocks = this_round_blocks;
1564 }
1565 store
1567 .write(WriteBatch::default().blocks(all_blocks))
1568 .expect("Storage error");
1569
1570 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1572 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
1573 let leader_schedule = Arc::new(LeaderSchedule::from_store(
1574 context.clone(),
1575 dag_state.clone(),
1576 ));
1577 let (blocks_sender, _blocks_receiver) =
1578 monitored_mpsc::unbounded_channel("consensus_block_output");
1579 let transaction_certifier =
1580 TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
1581
1582 let (commit_consumer, _commit_receiver, _transaction_receiver) =
1583 CommitConsumerArgs::new(0, 0);
1584 let commit_observer = CommitObserver::new(
1585 context.clone(),
1586 commit_consumer,
1587 dag_state.clone(),
1588 transaction_certifier.clone(),
1589 leader_schedule.clone(),
1590 )
1591 .await;
1592
1593 let last_commit = store.read_last_commit().unwrap();
1595 assert!(last_commit.is_none());
1596 assert_eq!(dag_state.read().last_commit_index(), 0);
1597
1598 let (signals, signal_receivers) = CoreSignals::new(context.clone());
1600 let (blocks_sender, _blocks_receiver) =
1601 monitored_mpsc::unbounded_channel("consensus_block_output");
1602 let transaction_certifier =
1603 TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
1604 transaction_certifier.recover(&NoopBlockVerifier, 0);
1605 let mut block_receiver = signal_receivers.block_broadcast_receiver();
1607 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
1608 let _core = Core::new(
1609 context.clone(),
1610 leader_schedule,
1611 transaction_consumer,
1612 transaction_certifier.clone(),
1613 block_manager,
1614 true,
1615 commit_observer,
1616 signals,
1617 key_pairs.remove(context.own_index.value()).1,
1618 dag_state.clone(),
1619 false,
1620 round_tracker,
1621 );
1622
1623 let mut new_round = signal_receivers.new_round_receiver();
1625 assert_eq!(*new_round.borrow_and_update(), 5);
1626
1627 let proposed_block = block_receiver
1629 .recv()
1630 .await
1631 .expect("A block should have been created");
1632 assert_eq!(proposed_block.block.round(), 5);
1633 let ancestors = proposed_block.block.ancestors();
1634
1635 assert_eq!(ancestors.len(), 4);
1637 for ancestor in ancestors {
1638 assert_eq!(ancestor.round, 4);
1639 }
1640
1641 dag_state.write().flush();
1643
1644 let last_commit = store
1648 .read_last_commit()
1649 .unwrap()
1650 .expect("last commit should be set");
1651 assert_eq!(last_commit.index(), 2);
1652 assert_eq!(dag_state.read().last_commit_index(), 2);
1653 let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1654 assert_eq!(all_stored_commits.len(), 2);
1655
1656 while let Some(result) = block_status_subscriptions.next().await {
1658 let status = result.unwrap();
1659 assert!(matches!(status, BlockStatus::Sequenced(_)));
1660 }
1661 }
1662
1663 #[tokio::test]
1666 async fn test_core_recover_from_store_for_partial_round() {
1667 telemetry_subscribers::init_for_testing();
1668
1669 let (context, mut key_pairs) = Context::new_for_test(4);
1670 let context = Arc::new(context);
1671 let store = Arc::new(MemStore::new());
1672 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1673 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1674
1675 let mut last_round_blocks = genesis_blocks(&context);
1677 let mut all_blocks = last_round_blocks.clone();
1678 for round in 1..=4 {
1679 let mut this_round_blocks = Vec::new();
1680
1681 let authorities_to_skip = if round == 4 {
1683 context.committee.validity_threshold() as usize
1684 } else {
1685 1
1687 };
1688
1689 for (index, _authority) in context.committee.authorities().skip(authorities_to_skip) {
1690 let block = TestBlock::new(round, index.value() as u32)
1691 .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
1692 .build();
1693 this_round_blocks.push(VerifiedBlock::new_for_test(block));
1694 }
1695 all_blocks.extend(this_round_blocks.clone());
1696 last_round_blocks = this_round_blocks;
1697 }
1698
1699 store
1701 .write(WriteBatch::default().blocks(all_blocks))
1702 .expect("Storage error");
1703
1704 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1706 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
1707 let leader_schedule = Arc::new(LeaderSchedule::from_store(
1708 context.clone(),
1709 dag_state.clone(),
1710 ));
1711 let (blocks_sender, _blocks_receiver) =
1712 monitored_mpsc::unbounded_channel("consensus_block_output");
1713 let transaction_certifier =
1714 TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
1715
1716 let (commit_consumer, _commit_receiver, _transaction_receiver) =
1717 CommitConsumerArgs::new(0, 0);
1718 let commit_observer = CommitObserver::new(
1719 context.clone(),
1720 commit_consumer,
1721 dag_state.clone(),
1722 transaction_certifier.clone(),
1723 leader_schedule.clone(),
1724 )
1725 .await;
1726
1727 let last_commit = store.read_last_commit().unwrap();
1729 assert!(last_commit.is_none());
1730 assert_eq!(dag_state.read().last_commit_index(), 0);
1731
1732 let (signals, signal_receivers) = CoreSignals::new(context.clone());
1734 let (blocks_sender, _blocks_receiver) =
1735 monitored_mpsc::unbounded_channel("consensus_block_output");
1736 let transaction_certifier =
1737 TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
1738 transaction_certifier.recover(&NoopBlockVerifier, 0);
1739 let mut block_receiver = signal_receivers.block_broadcast_receiver();
1741 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
1742 let mut core = Core::new(
1743 context.clone(),
1744 leader_schedule,
1745 transaction_consumer,
1746 transaction_certifier,
1747 block_manager,
1748 true,
1749 commit_observer,
1750 signals,
1751 key_pairs.remove(context.own_index.value()).1,
1752 dag_state.clone(),
1753 false,
1754 round_tracker,
1755 );
1756
1757 let mut new_round = signal_receivers.new_round_receiver();
1760 assert_eq!(*new_round.borrow_and_update(), 5);
1761
1762 let proposed_block = block_receiver
1764 .recv()
1765 .await
1766 .expect("A block should have been created");
1767 assert_eq!(proposed_block.block.round(), 4);
1768 let ancestors = proposed_block.block.ancestors();
1769
1770 assert_eq!(ancestors.len(), 4);
1771 for ancestor in ancestors {
1772 if ancestor.author == context.own_index {
1773 assert_eq!(ancestor.round, 0);
1774 } else {
1775 assert_eq!(ancestor.round, 3);
1776 }
1777 }
1778
1779 core.try_commit(vec![]).ok();
1781
1782 core.dag_state.write().flush();
1784
1785 let last_commit = store
1789 .read_last_commit()
1790 .unwrap()
1791 .expect("last commit should be set");
1792 assert_eq!(last_commit.index(), 2);
1793 assert_eq!(dag_state.read().last_commit_index(), 2);
1794 let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1795 assert_eq!(all_stored_commits.len(), 2);
1796 }
1797
1798 #[tokio::test]
1799 async fn test_core_propose_after_genesis() {
1800 telemetry_subscribers::init_for_testing();
1801 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1802 config.set_consensus_max_transaction_size_bytes_for_testing(2_000);
1803 config.set_consensus_max_transactions_in_block_bytes_for_testing(2_000);
1804 config
1805 });
1806
1807 let (context, mut key_pairs) = Context::new_for_test(4);
1808 let context = Arc::new(context);
1809 let store = Arc::new(MemStore::new());
1810 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1811
1812 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
1813 let (transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1814 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1815 let (blocks_sender, _blocks_receiver) =
1816 monitored_mpsc::unbounded_channel("consensus_block_output");
1817 let transaction_certifier =
1818 TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
1819 let (signals, signal_receivers) = CoreSignals::new(context.clone());
1820 let mut block_receiver = signal_receivers.block_broadcast_receiver();
1822 let leader_schedule = Arc::new(LeaderSchedule::from_store(
1823 context.clone(),
1824 dag_state.clone(),
1825 ));
1826
1827 let (commit_consumer, _commit_receiver, _transaction_receiver) =
1828 CommitConsumerArgs::new(0, 0);
1829 let commit_observer = CommitObserver::new(
1830 context.clone(),
1831 commit_consumer,
1832 dag_state.clone(),
1833 transaction_certifier.clone(),
1834 leader_schedule.clone(),
1835 )
1836 .await;
1837
1838 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
1839 let mut core = Core::new(
1840 context.clone(),
1841 leader_schedule,
1842 transaction_consumer,
1843 transaction_certifier,
1844 block_manager,
1845 true,
1846 commit_observer,
1847 signals,
1848 key_pairs.remove(context.own_index.value()).1,
1849 dag_state.clone(),
1850 false,
1851 round_tracker,
1852 );
1853
1854 let mut total = 0;
1856 let mut index = 0;
1857 loop {
1858 let transaction =
1859 bcs::to_bytes(&format!("Transaction {index}")).expect("Shouldn't fail");
1860 total += transaction.len();
1861 index += 1;
1862 let _w = transaction_client
1863 .submit_no_wait(vec![transaction])
1864 .await
1865 .unwrap();
1866
1867 if total >= 1_000 {
1869 break;
1870 }
1871 }
1872
1873 let extended_block = block_receiver
1875 .recv()
1876 .await
1877 .expect("A new block should have been created");
1878
1879 assert_eq!(extended_block.block.round(), 1);
1881 assert_eq!(extended_block.block.author().value(), 0);
1882 assert_eq!(extended_block.block.ancestors().len(), 4);
1883
1884 let mut total = 0;
1885 for (i, transaction) in extended_block.block.transactions().iter().enumerate() {
1886 total += transaction.data().len() as u64;
1887 let transaction: String = bcs::from_bytes(transaction.data()).unwrap();
1888 assert_eq!(format!("Transaction {i}"), transaction);
1889 }
1890 assert!(total <= context.protocol_config.max_transactions_in_block_bytes());
1891
1892 let all_genesis = genesis_blocks(&context);
1894
1895 for ancestor in extended_block.block.ancestors() {
1896 all_genesis
1897 .iter()
1898 .find(|block| block.reference() == *ancestor)
1899 .expect("Block should be found amongst genesis blocks");
1900 }
1901
1902 assert!(core.try_propose(false).unwrap().is_none());
1904 assert!(core.try_propose(true).unwrap().is_none());
1905
1906 dag_state.write().flush();
1908
1909 let last_commit = store.read_last_commit().unwrap();
1911 assert!(last_commit.is_none());
1912 assert_eq!(dag_state.read().last_commit_index(), 0);
1913 }
1914
1915 #[tokio::test]
1916 async fn test_core_propose_once_receiving_a_quorum() {
1917 telemetry_subscribers::init_for_testing();
1918 let (context, _key_pairs) = Context::new_for_test(4);
1919 let mut core_fixture = CoreTextFixture::new(
1920 context.clone(),
1921 vec![1, 1, 1, 1],
1922 AuthorityIndex::new_for_test(0),
1923 false,
1924 )
1925 .await;
1926 let transaction_certifier = &core_fixture.transaction_certifier;
1927 let store = &core_fixture.store;
1928 let dag_state = &core_fixture.dag_state;
1929 let core = &mut core_fixture.core;
1930
1931 let mut expected_ancestors = BTreeSet::new();
1932
1933 let block_1 = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
1935 expected_ancestors.insert(block_1.reference());
1936 sleep(context.parameters.min_round_delay).await;
1938 transaction_certifier.add_voted_blocks(vec![(block_1.clone(), vec![])]);
1940 _ = core.add_blocks(vec![block_1]);
1941
1942 assert_eq!(core.last_proposed_round(), 1);
1943 expected_ancestors.insert(core.last_proposed_block().reference());
1944 assert!(core.try_propose(false).unwrap().is_none());
1946
1947 let block_2 = VerifiedBlock::new_for_test(TestBlock::new(1, 2).build());
1949 expected_ancestors.insert(block_2.reference());
1950 sleep(context.parameters.min_round_delay).await;
1952 transaction_certifier.add_voted_blocks(vec![(block_2.clone(), vec![1, 4])]);
1954 _ = core.add_blocks(vec![block_2.clone()]);
1955
1956 assert_eq!(core.last_proposed_round(), 2);
1957
1958 let proposed_block = core.last_proposed_block();
1959 assert_eq!(proposed_block.round(), 2);
1960 assert_eq!(proposed_block.author(), context.own_index);
1961 assert_eq!(proposed_block.ancestors().len(), 3);
1962 let ancestors = proposed_block.ancestors();
1963 let ancestors = ancestors.iter().cloned().collect::<BTreeSet<_>>();
1964 assert_eq!(ancestors, expected_ancestors);
1965
1966 let transaction_votes = proposed_block.transaction_votes();
1967 assert_eq!(transaction_votes.len(), 1);
1968 let transaction_vote = transaction_votes.first().unwrap();
1969 assert_eq!(transaction_vote.block_ref, block_2.reference());
1970 assert_eq!(transaction_vote.rejects, vec![1, 4]);
1971
1972 dag_state.write().flush();
1974
1975 let last_commit = store.read_last_commit().unwrap();
1977 assert!(last_commit.is_none());
1978 assert_eq!(dag_state.read().last_commit_index(), 0);
1979 }
1980
1981 #[tokio::test]
1982 async fn test_commit_and_notify_for_block_status() {
1983 telemetry_subscribers::init_for_testing();
1984 let (mut context, mut key_pairs) = Context::new_for_test(4);
1985 const GC_DEPTH: u32 = 2;
1986
1987 context
1988 .protocol_config
1989 .set_consensus_gc_depth_for_testing(GC_DEPTH);
1990
1991 let context = Arc::new(context);
1992
1993 let store = Arc::new(MemStore::new());
1994 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1995 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1996 let mut block_status_subscriptions = FuturesUnordered::new();
1997
1998 let dag_str = "DAG {
1999 Round 0 : { 4 },
2000 Round 1 : { * },
2001 Round 2 : { * },
2002 Round 3 : {
2003 A -> [*],
2004 B -> [-A2],
2005 C -> [-A2],
2006 D -> [-A2],
2007 },
2008 Round 4 : {
2009 B -> [-A3],
2010 C -> [-A3],
2011 D -> [-A3],
2012 },
2013 Round 5 : {
2014 A -> [A3, B4, C4, D4]
2015 B -> [*],
2016 C -> [*],
2017 D -> [*],
2018 },
2019 Round 6 : { * },
2020 Round 7 : { * },
2021 Round 8 : { * },
2022 }";
2023
2024 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2025 dag_builder.print();
2026
2027 for block in dag_builder.blocks(1..=5) {
2029 if block.author() == context.own_index {
2030 let subscription =
2031 transaction_consumer.subscribe_for_block_status_testing(block.reference());
2032 block_status_subscriptions.push(subscription);
2033 }
2034 }
2035
2036 store
2038 .write(WriteBatch::default().blocks(dag_builder.blocks(1..=8)))
2039 .expect("Storage error");
2040
2041 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2043 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2044 let leader_schedule = Arc::new(LeaderSchedule::from_store(
2045 context.clone(),
2046 dag_state.clone(),
2047 ));
2048 let (blocks_sender, _blocks_receiver) =
2049 monitored_mpsc::unbounded_channel("consensus_block_output");
2050 let transaction_certifier =
2051 TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
2052
2053 let (commit_consumer, _commit_receiver, _transaction_receiver) =
2054 CommitConsumerArgs::new(0, 0);
2055 let commit_observer = CommitObserver::new(
2056 context.clone(),
2057 commit_consumer,
2058 dag_state.clone(),
2059 transaction_certifier.clone(),
2060 leader_schedule.clone(),
2061 )
2062 .await;
2063
2064 dag_state.write().flush();
2066
2067 let last_commit = store.read_last_commit().unwrap();
2069 assert!(last_commit.is_none());
2070 assert_eq!(dag_state.read().last_commit_index(), 0);
2071
2072 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2074 let (blocks_sender, _blocks_receiver) =
2075 monitored_mpsc::unbounded_channel("consensus_block_output");
2076 let transaction_certifier =
2077 TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
2078 transaction_certifier.recover(&NoopBlockVerifier, 0);
2079 let _block_receiver = signal_receivers.block_broadcast_receiver();
2081 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
2082 let _core = Core::new(
2083 context.clone(),
2084 leader_schedule,
2085 transaction_consumer,
2086 transaction_certifier,
2087 block_manager,
2088 true,
2089 commit_observer,
2090 signals,
2091 key_pairs.remove(context.own_index.value()).1,
2092 dag_state.clone(),
2093 false,
2094 round_tracker,
2095 );
2096
2097 dag_state.write().flush();
2099
2100 let last_commit = store
2101 .read_last_commit()
2102 .unwrap()
2103 .expect("last commit should be set");
2104
2105 assert_eq!(last_commit.index(), 5);
2106
2107 while let Some(result) = block_status_subscriptions.next().await {
2108 let status = result.unwrap();
2109
2110 match status {
2111 BlockStatus::Sequenced(block_ref) => {
2112 assert!(block_ref.round == 1 || block_ref.round == 5);
2113 }
2114 BlockStatus::GarbageCollected(block_ref) => {
2115 assert!(block_ref.round == 2 || block_ref.round == 3);
2116 }
2117 }
2118 }
2119 }
2120
2121 #[tokio::test]
2124 async fn test_multiple_commits_advance_threshold_clock() {
2125 telemetry_subscribers::init_for_testing();
2126 let (mut context, mut key_pairs) = Context::new_for_test(4);
2127 const GC_DEPTH: u32 = 2;
2128
2129 context
2130 .protocol_config
2131 .set_consensus_gc_depth_for_testing(GC_DEPTH);
2132
2133 let context = Arc::new(context);
2134
2135 let store = Arc::new(MemStore::new());
2136 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2137 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2138
2139 let dag_str = "DAG {
2143 Round 0 : { 4 },
2144 Round 1 : { * },
2145 Round 2 : {
2146 B -> [-D1],
2147 C -> [-D1],
2148 D -> [-D1],
2149 },
2150 Round 3 : {
2151 B -> [*],
2152 C -> [*]
2153 D -> [*],
2154 },
2155 Round 4 : {
2156 A -> [*],
2157 B -> [*],
2158 C -> [*]
2159 D -> [*],
2160 },
2161 Round 5 : {
2162 A -> [*],
2163 B -> [*],
2164 C -> [*],
2165 D -> [*],
2166 },
2167 Round 6 : {
2168 B -> [A5, B5, C5, D1],
2169 C -> [A5, B5, C5, D1],
2170 D -> [A5, B5, C5, D1],
2171 },
2172 Round 7 : {
2173 B -> [*],
2174 C -> [*],
2175 D -> [*],
2176 },
2177 Round 8 : {
2178 B -> [*],
2179 C -> [*],
2180 D -> [*],
2181 },
2182 Round 9 : {
2183 B -> [*],
2184 C -> [*],
2185 D -> [*],
2186 },
2187 Round 10 : {
2188 B -> [*],
2189 C -> [*],
2190 D -> [*],
2191 },
2192 Round 11 : {
2193 B -> [*],
2194 C -> [*],
2195 D -> [*],
2196 },
2197 }";
2198
2199 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2200 dag_builder.print();
2201
2202 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2204 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2205 let leader_schedule = Arc::new(LeaderSchedule::from_store(
2206 context.clone(),
2207 dag_state.clone(),
2208 ));
2209 let (blocks_sender, _blocks_receiver) =
2210 monitored_mpsc::unbounded_channel("consensus_block_output");
2211 let transaction_certifier =
2212 TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
2213
2214 let (commit_consumer, _commit_receiver, _transaction_receiver) =
2215 CommitConsumerArgs::new(0, 0);
2216 let commit_observer = CommitObserver::new(
2217 context.clone(),
2218 commit_consumer,
2219 dag_state.clone(),
2220 transaction_certifier.clone(),
2221 leader_schedule.clone(),
2222 )
2223 .await;
2224
2225 dag_state.write().flush();
2227
2228 let last_commit = store.read_last_commit().unwrap();
2230 assert!(last_commit.is_none());
2231 assert_eq!(dag_state.read().last_commit_index(), 0);
2232
2233 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2235 let (blocks_sender, _blocks_receiver) =
2236 monitored_mpsc::unbounded_channel("consensus_block_output");
2237 let transaction_certifier =
2238 TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
2239 let _block_receiver = signal_receivers.block_broadcast_receiver();
2241 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
2242 let mut core = Core::new(
2243 context.clone(),
2244 leader_schedule,
2245 transaction_consumer,
2246 transaction_certifier.clone(),
2247 block_manager,
2248 true,
2249 commit_observer,
2250 signals,
2251 key_pairs.remove(context.own_index.value()).1,
2252 dag_state.clone(),
2253 true,
2254 round_tracker,
2255 );
2256 core.set_last_known_proposed_round(4);
2259
2260 let mut all_blocks = dag_builder.blocks(1..=11);
2266 all_blocks.sort_by_key(|b| b.round());
2267 let voted_blocks: Vec<(VerifiedBlock, Vec<TransactionIndex>)> =
2268 all_blocks.iter().map(|b| (b.clone(), vec![])).collect();
2269 transaction_certifier.add_voted_blocks(voted_blocks);
2270 let blocks: Vec<VerifiedBlock> = all_blocks
2271 .into_iter()
2272 .filter(|b| !(b.round() == 1 && b.author() == AuthorityIndex::new_for_test(3)))
2273 .collect();
2274 core.add_blocks(blocks).expect("Should not fail");
2275
2276 assert_eq!(core.last_proposed_round(), 12);
2277 }
2278
2279 #[tokio::test]
2280 async fn test_core_set_min_propose_round() {
2281 telemetry_subscribers::init_for_testing();
2282 let (context, mut key_pairs) = Context::new_for_test(4);
2283 let context = Arc::new(context.with_parameters(Parameters {
2284 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2285 ..Default::default()
2286 }));
2287
2288 let store = Arc::new(MemStore::new());
2289 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2290
2291 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2292 let leader_schedule = Arc::new(LeaderSchedule::from_store(
2293 context.clone(),
2294 dag_state.clone(),
2295 ));
2296
2297 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2298 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2299 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2300 let (blocks_sender, _blocks_receiver) =
2301 monitored_mpsc::unbounded_channel("consensus_block_output");
2302 let transaction_certifier =
2303 TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
2304 let _block_receiver = signal_receivers.block_broadcast_receiver();
2306
2307 let (commit_consumer, _commit_receiver, _transaction_receiver) =
2308 CommitConsumerArgs::new(0, 0);
2309 let commit_observer = CommitObserver::new(
2310 context.clone(),
2311 commit_consumer,
2312 dag_state.clone(),
2313 transaction_certifier.clone(),
2314 leader_schedule.clone(),
2315 )
2316 .await;
2317
2318 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
2319 let mut core = Core::new(
2320 context.clone(),
2321 leader_schedule,
2322 transaction_consumer,
2323 transaction_certifier.clone(),
2324 block_manager,
2325 true,
2326 commit_observer,
2327 signals,
2328 key_pairs.remove(context.own_index.value()).1,
2329 dag_state.clone(),
2330 true,
2331 round_tracker,
2332 );
2333
2334 assert_eq!(
2336 core.last_proposed_round(),
2337 GENESIS_ROUND,
2338 "No block should have been created other than genesis"
2339 );
2340
2341 assert!(core.try_propose(true).unwrap().is_none());
2343
2344 let mut builder = DagBuilder::new(context.clone());
2346 builder.layers(1..=10).build();
2347
2348 let blocks = builder.blocks.values().cloned().collect::<Vec<_>>();
2349
2350 transaction_certifier
2352 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2353 assert!(core.add_blocks(blocks).unwrap().is_empty());
2354
2355 core.round_tracker.write().update_from_probe(
2356 vec![
2357 vec![10, 10, 10, 10],
2358 vec![10, 10, 10, 10],
2359 vec![10, 10, 10, 10],
2360 vec![10, 10, 10, 10],
2361 ],
2362 vec![
2363 vec![10, 10, 10, 10],
2364 vec![10, 10, 10, 10],
2365 vec![10, 10, 10, 10],
2366 vec![10, 10, 10, 10],
2367 ],
2368 );
2369
2370 assert!(core.try_propose(true).unwrap().is_none());
2372
2373 core.set_last_known_proposed_round(10);
2376
2377 let block = core.try_propose(true).expect("No error").unwrap();
2378 assert_eq!(block.round(), 11);
2379 assert_eq!(block.ancestors().len(), 4);
2380
2381 let our_ancestor_included = block.ancestors()[0];
2382 assert_eq!(our_ancestor_included.author, context.own_index);
2383 assert_eq!(our_ancestor_included.round, 10);
2384 }
2385
2386 #[tokio::test(flavor = "current_thread", start_paused = true)]
2387 async fn test_core_try_new_block_leader_timeout() {
2388 telemetry_subscribers::init_for_testing();
2389
2390 async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
2397 let now = context.clock.timestamp_utc_ms();
2399 let max_timestamp = blocks
2400 .iter()
2401 .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
2402 .map(|block| block.timestamp_ms())
2403 .unwrap_or(0);
2404
2405 let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
2406 sleep(wait_time).await;
2407 }
2408
2409 let (context, _) = Context::new_for_test(4);
2410 let mut all_cores = create_cores(context, vec![1, 1, 1, 1]).await;
2412
2413 let (_last_core, cores) = all_cores.split_last_mut().unwrap();
2418
2419 let mut last_round_blocks = Vec::<VerifiedBlock>::new();
2421 for round in 1..=3 {
2422 let mut this_round_blocks = Vec::new();
2423
2424 for core_fixture in cores.iter_mut() {
2425 wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2426
2427 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
2428
2429 if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2431 assert_eq!(round - 1, r);
2432 if core_fixture.core.last_proposed_round() == r {
2433 core_fixture
2435 .core
2436 .try_propose(true)
2437 .unwrap()
2438 .unwrap_or_else(|| {
2439 panic!("Block should have been proposed for round {}", round)
2440 });
2441 }
2442 }
2443
2444 assert_eq!(core_fixture.core.last_proposed_round(), round);
2445
2446 this_round_blocks.push(core_fixture.core.last_proposed_block());
2447 }
2448
2449 last_round_blocks = this_round_blocks;
2450 }
2451
2452 for core_fixture in cores.iter_mut() {
2455 wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2456
2457 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
2458 assert!(core_fixture.core.try_propose(false).unwrap().is_none());
2459 }
2460
2461 for core_fixture in cores.iter_mut() {
2464 assert!(core_fixture.core.new_block(4, true).unwrap().is_some());
2465 assert_eq!(core_fixture.core.last_proposed_round(), 4);
2466
2467 core_fixture.dag_state.write().flush();
2469
2470 let last_commit = core_fixture
2472 .store
2473 .read_last_commit()
2474 .unwrap()
2475 .expect("last commit should be set");
2476 assert_eq!(last_commit.index(), 1);
2479 let all_stored_commits = core_fixture
2480 .store
2481 .scan_commits((0..=CommitIndex::MAX).into())
2482 .unwrap();
2483 assert_eq!(all_stored_commits.len(), 1);
2484 }
2485 }
2486
2487 #[tokio::test(flavor = "current_thread", start_paused = true)]
2488 async fn test_core_try_new_block_with_leader_timeout_and_low_scoring_authority() {
2489 telemetry_subscribers::init_for_testing();
2490
2491 async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
2498 let now = context.clock.timestamp_utc_ms();
2500 let max_timestamp = blocks
2501 .iter()
2502 .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
2503 .map(|block| block.timestamp_ms())
2504 .unwrap_or(0);
2505
2506 let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
2507 sleep(wait_time).await;
2508 }
2509
2510 let (mut context, _) = Context::new_for_test(5);
2511 context
2512 .protocol_config
2513 .set_consensus_bad_nodes_stake_threshold_for_testing(33);
2514
2515 let mut all_cores = create_cores(context, vec![1, 1, 1, 1, 1]).await;
2517 let (_last_core, cores) = all_cores.split_last_mut().unwrap();
2518
2519 let mut last_round_blocks = Vec::<VerifiedBlock>::new();
2521 for round in 1..=30 {
2522 let mut this_round_blocks = Vec::new();
2523
2524 for core_fixture in cores.iter_mut() {
2525 wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2526
2527 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
2528
2529 core_fixture.core.round_tracker.write().update_from_probe(
2530 vec![
2531 vec![round, round, round, round, 0],
2532 vec![round, round, round, round, 0],
2533 vec![round, round, round, round, 0],
2534 vec![round, round, round, round, 0],
2535 vec![0, 0, 0, 0, 0],
2536 ],
2537 vec![
2538 vec![round, round, round, round, 0],
2539 vec![round, round, round, round, 0],
2540 vec![round, round, round, round, 0],
2541 vec![round, round, round, round, 0],
2542 vec![0, 0, 0, 0, 0],
2543 ],
2544 );
2545
2546 if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2548 assert_eq!(round - 1, r);
2549 if core_fixture.core.last_proposed_round() == r {
2550 core_fixture
2552 .core
2553 .try_propose(true)
2554 .unwrap()
2555 .unwrap_or_else(|| {
2556 panic!("Block should have been proposed for round {}", round)
2557 });
2558 }
2559 }
2560
2561 assert_eq!(core_fixture.core.last_proposed_round(), round);
2562
2563 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2564 }
2565
2566 last_round_blocks = this_round_blocks;
2567 }
2568
2569 for round in 31..=40 {
2571 let mut this_round_blocks = Vec::new();
2572
2573 for core_fixture in all_cores.iter_mut() {
2574 wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2575
2576 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
2577
2578 core_fixture.core.round_tracker.write().update_from_probe(
2581 vec![
2582 vec![round, round, round, round, 0],
2583 vec![round, round, round, round, 0],
2584 vec![round, round, round, round, 0],
2585 vec![round, round, round, round, 0],
2586 vec![0, 0, 0, 0, 0],
2587 ],
2588 vec![
2589 vec![round, round, round, round, 0],
2590 vec![round, round, round, round, 0],
2591 vec![round, round, round, round, 0],
2592 vec![round, round, round, round, 0],
2593 vec![0, 0, 0, 0, 0],
2594 ],
2595 );
2596
2597 if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2599 assert_eq!(round - 1, r);
2600 if core_fixture.core.last_proposed_round() == r {
2601 core_fixture
2603 .core
2604 .try_propose(true)
2605 .unwrap()
2606 .unwrap_or_else(|| {
2607 panic!("Block should have been proposed for round {}", round)
2608 });
2609 }
2610 }
2611
2612 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2613
2614 for block in this_round_blocks.iter() {
2615 if block.author() != AuthorityIndex::new_for_test(4) {
2616 assert_eq!(block.ancestors().len(), 4);
2619 } else {
2620 assert_eq!(block.ancestors().len(), 5);
2623 }
2624 }
2625 }
2626
2627 last_round_blocks = this_round_blocks;
2628 }
2629 }
2630
2631 #[tokio::test]
2632 async fn test_smart_ancestor_selection() {
2633 telemetry_subscribers::init_for_testing();
2634 let (mut context, mut key_pairs) = Context::new_for_test(7);
2635 context
2636 .protocol_config
2637 .set_consensus_bad_nodes_stake_threshold_for_testing(33);
2638 let context = Arc::new(context.with_parameters(Parameters {
2639 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2640 ..Default::default()
2641 }));
2642
2643 let store = Arc::new(MemStore::new());
2644 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2645
2646 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2647 let leader_schedule = Arc::new(
2648 LeaderSchedule::from_store(context.clone(), dag_state.clone())
2649 .with_num_commits_per_schedule(10),
2650 );
2651
2652 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2653 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2654 let (blocks_sender, _blocks_receiver) =
2655 monitored_mpsc::unbounded_channel("consensus_block_output");
2656 let transaction_certifier =
2657 TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
2658 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2659 let mut block_receiver = signal_receivers.block_broadcast_receiver();
2661
2662 let (commit_consumer, _commit_receiver, _transaction_receiver) =
2663 CommitConsumerArgs::new(0, 0);
2664 let commit_observer = CommitObserver::new(
2665 context.clone(),
2666 commit_consumer,
2667 dag_state.clone(),
2668 transaction_certifier.clone(),
2669 leader_schedule.clone(),
2670 )
2671 .await;
2672
2673 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
2674 let mut core = Core::new(
2675 context.clone(),
2676 leader_schedule,
2677 transaction_consumer,
2678 transaction_certifier.clone(),
2679 block_manager,
2680 true,
2681 commit_observer,
2682 signals,
2683 key_pairs.remove(context.own_index.value()).1,
2684 dag_state.clone(),
2685 true,
2686 round_tracker.clone(),
2687 );
2688
2689 assert_eq!(
2691 core.last_proposed_round(),
2692 GENESIS_ROUND,
2693 "No block should have been created other than genesis"
2694 );
2695
2696 assert!(core.try_propose(true).unwrap().is_none());
2698
2699 let mut builder = DagBuilder::new(context.clone());
2701 builder
2702 .layers(1..=12)
2703 .authorities(vec![AuthorityIndex::new_for_test(1)])
2704 .skip_block()
2705 .build();
2706 let blocks = builder.blocks(1..=12);
2707 transaction_certifier
2709 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2710 assert!(core.add_blocks(blocks).unwrap().is_empty());
2711 core.set_last_known_proposed_round(12);
2712
2713 round_tracker.write().update_from_probe(
2714 vec![
2715 vec![12, 12, 12, 12, 12, 12, 12],
2716 vec![0, 0, 0, 0, 0, 0, 0],
2717 vec![12, 12, 12, 12, 12, 12, 12],
2718 vec![12, 12, 12, 12, 12, 12, 12],
2719 vec![12, 12, 12, 12, 12, 12, 12],
2720 vec![12, 12, 12, 12, 12, 12, 12],
2721 vec![12, 12, 12, 12, 12, 12, 12],
2722 ],
2723 vec![
2724 vec![12, 12, 12, 12, 12, 12, 12],
2725 vec![0, 0, 0, 0, 0, 0, 0],
2726 vec![12, 12, 12, 12, 12, 12, 12],
2727 vec![12, 12, 12, 12, 12, 12, 12],
2728 vec![12, 12, 12, 12, 12, 12, 12],
2729 vec![12, 12, 12, 12, 12, 12, 12],
2730 vec![12, 12, 12, 12, 12, 12, 12],
2731 ],
2732 );
2733
2734 let block = core.try_propose(true).expect("No error").unwrap();
2735 assert_eq!(block.round(), 13);
2736 assert_eq!(block.ancestors().len(), 7);
2737
2738 builder
2740 .layers(13..=14)
2741 .authorities(vec![AuthorityIndex::new_for_test(0)])
2742 .skip_block()
2743 .build();
2744 let blocks = builder.blocks(13..=14);
2745 transaction_certifier
2746 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2747 assert!(core.add_blocks(blocks).unwrap().is_empty());
2748
2749 let block = core.try_propose(true).expect("No error").unwrap();
2752 assert_eq!(block.round(), 15);
2753 assert_eq!(block.ancestors().len(), 6);
2754
2755 let round_14_ancestors = builder.last_ancestors.clone();
2758 builder
2759 .layer(15)
2760 .authorities(vec![
2761 AuthorityIndex::new_for_test(0),
2762 AuthorityIndex::new_for_test(5),
2763 AuthorityIndex::new_for_test(6),
2764 ])
2765 .skip_block()
2766 .build();
2767 let blocks = builder.blocks(15..=15);
2768 let authority_1_excluded_block_reference = blocks
2769 .iter()
2770 .find(|block| block.author() == AuthorityIndex::new_for_test(1))
2771 .unwrap()
2772 .reference();
2773 sleep(context.parameters.min_round_delay).await;
2775 transaction_certifier
2777 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2778 assert!(core.add_blocks(blocks).unwrap().is_empty());
2779 assert_eq!(core.last_proposed_block().round(), 15);
2780
2781 builder
2782 .layer(15)
2783 .authorities(vec![
2784 AuthorityIndex::new_for_test(0),
2785 AuthorityIndex::new_for_test(1),
2786 AuthorityIndex::new_for_test(2),
2787 AuthorityIndex::new_for_test(3),
2788 AuthorityIndex::new_for_test(4),
2789 ])
2790 .skip_block()
2791 .override_last_ancestors(round_14_ancestors)
2792 .build();
2793 let blocks = builder.blocks(15..=15);
2794 let round_15_ancestors: Vec<BlockRef> = blocks
2795 .iter()
2796 .filter(|block| block.round() == 15)
2797 .map(|block| block.reference())
2798 .collect();
2799 let included_block_references = iter::once(&core.last_proposed_block())
2800 .chain(blocks.iter())
2801 .filter(|block| block.author() != AuthorityIndex::new_for_test(1))
2802 .map(|block| block.reference())
2803 .collect::<Vec<_>>();
2804
2805 transaction_certifier
2807 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2808 assert!(core.add_blocks(blocks).unwrap().is_empty());
2809 assert_eq!(core.last_proposed_block().round(), 16);
2810
2811 let extended_block = loop {
2813 let extended_block =
2814 tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2815 .await
2816 .unwrap()
2817 .unwrap();
2818 if extended_block.block.round() == 16 {
2819 break extended_block;
2820 }
2821 };
2822 assert_eq!(extended_block.block.round(), 16);
2823 assert_eq!(extended_block.block.author(), core.context.own_index);
2824 assert_eq!(extended_block.block.ancestors().len(), 6);
2825 assert_eq!(extended_block.block.ancestors(), included_block_references);
2826 assert_eq!(extended_block.excluded_ancestors.len(), 1);
2827 assert_eq!(
2828 extended_block.excluded_ancestors[0],
2829 authority_1_excluded_block_reference
2830 );
2831
2832 builder
2837 .layer(16)
2838 .authorities(vec![
2839 AuthorityIndex::new_for_test(0),
2840 AuthorityIndex::new_for_test(5),
2841 AuthorityIndex::new_for_test(6),
2842 ])
2843 .skip_block()
2844 .override_last_ancestors(round_15_ancestors)
2845 .build();
2846 let blocks = builder.blocks(16..=16);
2847 sleep(context.parameters.min_round_delay).await;
2849 transaction_certifier
2851 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2852 assert!(core.add_blocks(blocks).unwrap().is_empty());
2853 assert_eq!(core.last_proposed_block().round(), 16);
2854
2855 let block = core.try_propose(true).expect("No error").unwrap();
2858 assert_eq!(block.round(), 17);
2859 assert_eq!(block.ancestors().len(), 5);
2860
2861 let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2863 .await
2864 .unwrap()
2865 .unwrap();
2866 assert_eq!(extended_block.block.round(), 17);
2867 assert_eq!(extended_block.block.author(), core.context.own_index);
2868 assert_eq!(extended_block.block.ancestors().len(), 5);
2869 assert_eq!(extended_block.excluded_ancestors.len(), 0);
2870
2871 builder
2874 .layers(17..=22)
2875 .authorities(vec![AuthorityIndex::new_for_test(0)])
2876 .skip_block()
2877 .build();
2878 let blocks = builder.blocks(17..=22);
2879
2880 round_tracker.write().update_from_probe(
2886 vec![
2887 vec![22, 22, 22, 22, 22, 22, 22],
2888 vec![22, 22, 22, 22, 22, 22, 22],
2889 vec![22, 22, 22, 22, 22, 22, 22],
2890 vec![22, 22, 22, 22, 22, 22, 22],
2891 vec![22, 22, 22, 22, 22, 22, 22],
2892 vec![22, 22, 22, 22, 22, 22, 22],
2893 vec![22, 22, 22, 22, 22, 22, 22],
2894 ],
2895 vec![
2896 vec![22, 22, 22, 22, 22, 22, 22],
2897 vec![22, 22, 22, 22, 22, 22, 22],
2898 vec![22, 22, 22, 22, 22, 22, 22],
2899 vec![22, 22, 22, 22, 22, 22, 22],
2900 vec![22, 22, 22, 22, 22, 22, 22],
2901 vec![22, 22, 22, 22, 22, 22, 22],
2902 vec![22, 22, 22, 22, 22, 22, 22],
2903 ],
2904 );
2905
2906 let included_block_references = iter::once(&core.last_proposed_block())
2907 .chain(blocks.iter())
2908 .filter(|block| block.round() == 22 || block.author() == core.context.own_index)
2909 .map(|block| block.reference())
2910 .collect::<Vec<_>>();
2911
2912 sleep(context.parameters.min_round_delay).await;
2914 transaction_certifier
2915 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2916 assert!(core.add_blocks(blocks).unwrap().is_empty());
2917 assert_eq!(core.last_proposed_block().round(), 23);
2918
2919 let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2921 .await
2922 .unwrap()
2923 .unwrap();
2924 assert_eq!(extended_block.block.round(), 23);
2925 assert_eq!(extended_block.block.author(), core.context.own_index);
2926 assert_eq!(extended_block.block.ancestors().len(), 7);
2927 assert_eq!(extended_block.block.ancestors(), included_block_references);
2928 assert_eq!(extended_block.excluded_ancestors.len(), 0);
2929 }
2930
2931 #[tokio::test]
2932 async fn test_excluded_ancestor_limit() {
2933 telemetry_subscribers::init_for_testing();
2934 let (context, mut key_pairs) = Context::new_for_test(4);
2935 let context = Arc::new(context.with_parameters(Parameters {
2936 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2937 ..Default::default()
2938 }));
2939
2940 let store = Arc::new(MemStore::new());
2941 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2942
2943 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2944 let leader_schedule = Arc::new(
2945 LeaderSchedule::from_store(context.clone(), dag_state.clone())
2946 .with_num_commits_per_schedule(10),
2947 );
2948
2949 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2950 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2951 let (blocks_sender, _blocks_receiver) =
2952 monitored_mpsc::unbounded_channel("consensus_block_output");
2953 let transaction_certifier =
2954 TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
2955 let (signals, signal_receivers) = CoreSignals::new(context.clone());
2956 let mut block_receiver = signal_receivers.block_broadcast_receiver();
2958
2959 let (commit_consumer, _commit_receiver, _transaction_receiver) =
2960 CommitConsumerArgs::new(0, 0);
2961 let commit_observer = CommitObserver::new(
2962 context.clone(),
2963 commit_consumer,
2964 dag_state.clone(),
2965 transaction_certifier.clone(),
2966 leader_schedule.clone(),
2967 )
2968 .await;
2969
2970 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
2971 let mut core = Core::new(
2972 context.clone(),
2973 leader_schedule,
2974 transaction_consumer,
2975 transaction_certifier.clone(),
2976 block_manager,
2977 true,
2978 commit_observer,
2979 signals,
2980 key_pairs.remove(context.own_index.value()).1,
2981 dag_state.clone(),
2982 true,
2983 round_tracker,
2984 );
2985
2986 assert_eq!(
2988 core.last_proposed_round(),
2989 GENESIS_ROUND,
2990 "No block should have been created other than genesis"
2991 );
2992
2993 let mut builder = DagBuilder::new(context.clone());
2995 builder.layers(1..=3).build();
2996
2997 builder
3001 .layer(4)
3002 .authorities(vec![AuthorityIndex::new_for_test(1)])
3003 .equivocate(9)
3004 .build();
3005 let blocks = builder.blocks(1..=4);
3006
3007 transaction_certifier
3009 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
3010 assert!(core.add_blocks(blocks).unwrap().is_empty());
3011 core.set_last_known_proposed_round(3);
3012
3013 let block = core.try_propose(true).expect("No error").unwrap();
3014 assert_eq!(block.round(), 5);
3015 assert_eq!(block.ancestors().len(), 4);
3016
3017 let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
3019 .await
3020 .unwrap()
3021 .unwrap();
3022 assert_eq!(extended_block.block.round(), 5);
3023 assert_eq!(extended_block.block.author(), core.context.own_index);
3024 assert_eq!(extended_block.block.ancestors().len(), 4);
3025 assert_eq!(extended_block.excluded_ancestors.len(), 8);
3026 }
3027
3028 #[tokio::test]
3029 async fn test_core_set_subscriber_exists() {
3030 telemetry_subscribers::init_for_testing();
3031 let (context, mut key_pairs) = Context::new_for_test(4);
3032 let context = Arc::new(context);
3033 let store = Arc::new(MemStore::new());
3034 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3035
3036 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
3037 let leader_schedule = Arc::new(LeaderSchedule::from_store(
3038 context.clone(),
3039 dag_state.clone(),
3040 ));
3041
3042 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3043 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3044 let (blocks_sender, _blocks_receiver) =
3045 monitored_mpsc::unbounded_channel("consensus_block_output");
3046 let transaction_certifier =
3047 TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
3048 let (signals, signal_receivers) = CoreSignals::new(context.clone());
3049 let _block_receiver = signal_receivers.block_broadcast_receiver();
3051
3052 let (commit_consumer, _commit_receiver, _transaction_receiver) =
3053 CommitConsumerArgs::new(0, 0);
3054 let commit_observer = CommitObserver::new(
3055 context.clone(),
3056 commit_consumer,
3057 dag_state.clone(),
3058 transaction_certifier.clone(),
3059 leader_schedule.clone(),
3060 )
3061 .await;
3062
3063 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
3064 let mut core = Core::new(
3065 context.clone(),
3066 leader_schedule,
3067 transaction_consumer,
3068 transaction_certifier.clone(),
3069 block_manager,
3070 false,
3072 commit_observer,
3073 signals,
3074 key_pairs.remove(context.own_index.value()).1,
3075 dag_state.clone(),
3076 false,
3077 round_tracker,
3078 );
3079
3080 assert_eq!(
3082 core.last_proposed_round(),
3083 GENESIS_ROUND,
3084 "No block should have been created other than genesis"
3085 );
3086
3087 assert!(core.try_propose(true).unwrap().is_none());
3089
3090 core.set_subscriber_exists(true);
3092
3093 assert!(core.try_propose(true).unwrap().is_some());
3095 }
3096
3097 #[tokio::test]
3098 async fn test_core_set_propagation_delay_per_authority() {
3099 telemetry_subscribers::init_for_testing();
3101 let (context, mut key_pairs) = Context::new_for_test(4);
3102 let context = Arc::new(context);
3103 let store = Arc::new(MemStore::new());
3104 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3105
3106 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
3107 let leader_schedule = Arc::new(LeaderSchedule::from_store(
3108 context.clone(),
3109 dag_state.clone(),
3110 ));
3111
3112 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3113 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3114 let (signals, signal_receivers) = CoreSignals::new(context.clone());
3115 let (blocks_sender, _blocks_receiver) =
3116 monitored_mpsc::unbounded_channel("consensus_block_output");
3117 let transaction_certifier =
3118 TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
3119 let _block_receiver = signal_receivers.block_broadcast_receiver();
3121
3122 let (commit_consumer, _commit_receiver, _transaction_receiver) =
3123 CommitConsumerArgs::new(0, 0);
3124 let commit_observer = CommitObserver::new(
3125 context.clone(),
3126 commit_consumer,
3127 dag_state.clone(),
3128 transaction_certifier.clone(),
3129 leader_schedule.clone(),
3130 )
3131 .await;
3132
3133 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
3134 let mut core = Core::new(
3135 context.clone(),
3136 leader_schedule,
3137 transaction_consumer,
3138 transaction_certifier.clone(),
3139 block_manager,
3140 false,
3142 commit_observer,
3143 signals,
3144 key_pairs.remove(context.own_index.value()).1,
3145 dag_state.clone(),
3146 false,
3147 round_tracker.clone(),
3148 );
3149
3150 assert_eq!(
3152 core.last_proposed_round(),
3153 GENESIS_ROUND,
3154 "No block should have been created other than genesis"
3155 );
3156
3157 let test_block = VerifiedBlock::new_for_test(TestBlock::new(1000, 0).build());
3162 transaction_certifier.add_voted_blocks(vec![(test_block.clone(), vec![])]);
3163 dag_state.write().accept_block(test_block);
3165
3166 round_tracker.write().update_from_probe(
3167 vec![
3168 vec![0, 0, 0, 0],
3169 vec![0, 0, 0, 0],
3170 vec![0, 0, 0, 0],
3171 vec![0, 0, 0, 0],
3172 ],
3173 vec![
3174 vec![0, 0, 0, 0],
3175 vec![0, 0, 0, 0],
3176 vec![0, 0, 0, 0],
3177 vec![0, 0, 0, 0],
3178 ],
3179 );
3180
3181 core.set_subscriber_exists(true);
3183
3184 assert!(core.try_propose(true).unwrap().is_none());
3186
3187 round_tracker.write().update_from_probe(
3191 vec![
3192 vec![1000, 1000, 1000, 1000],
3193 vec![1000, 1000, 1000, 1000],
3194 vec![1000, 1000, 1000, 1000],
3195 vec![1000, 1000, 1000, 1000],
3196 ],
3197 vec![
3198 vec![1000, 1000, 1000, 1000],
3199 vec![1000, 1000, 1000, 1000],
3200 vec![1000, 1000, 1000, 1000],
3201 vec![1000, 1000, 1000, 1000],
3202 ],
3203 );
3204
3205 for author in 1..4 {
3208 let block = VerifiedBlock::new_for_test(TestBlock::new(1000, author).build());
3209 transaction_certifier.add_voted_blocks(vec![(block.clone(), vec![])]);
3210 dag_state.write().accept_block(block);
3212 }
3213
3214 assert!(core.try_propose(true).unwrap().is_some());
3216 }
3217
3218 #[tokio::test(flavor = "current_thread", start_paused = true)]
3219 async fn test_leader_schedule_change() {
3220 telemetry_subscribers::init_for_testing();
3221 let default_params = Parameters::default();
3222
3223 let (context, _) = Context::new_for_test(4);
3224 let mut cores = create_cores(context, vec![1, 1, 1, 1]).await;
3226
3227 let mut last_round_blocks = Vec::new();
3229 for round in 1..=30 {
3230 let mut this_round_blocks = Vec::new();
3231
3232 sleep(default_params.min_round_delay).await;
3234
3235 for core_fixture in &mut cores {
3236 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
3239
3240 core_fixture.core.round_tracker.write().update_from_probe(
3241 vec![
3242 vec![round, round, round, round],
3243 vec![round, round, round, round],
3244 vec![round, round, round, round],
3245 vec![round, round, round, round],
3246 ],
3247 vec![
3248 vec![round, round, round, round],
3249 vec![round, round, round, round],
3250 vec![round, round, round, round],
3251 vec![round, round, round, round],
3252 ],
3253 );
3254
3255 let new_round = receive(
3257 Duration::from_secs(1),
3258 core_fixture.signal_receivers.new_round_receiver(),
3259 )
3260 .await;
3261 assert_eq!(new_round, round);
3262
3263 let extended_block = tokio::time::timeout(
3265 Duration::from_secs(1),
3266 core_fixture.block_receiver.recv(),
3267 )
3268 .await
3269 .unwrap()
3270 .unwrap();
3271 assert_eq!(extended_block.block.round(), round);
3272 assert_eq!(
3273 extended_block.block.author(),
3274 core_fixture.core.context.own_index
3275 );
3276
3277 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3279
3280 let block = core_fixture.core.last_proposed_block();
3281
3282 assert_eq!(
3284 block.ancestors().len(),
3285 core_fixture.core.context.committee.size()
3286 );
3287 for ancestor in block.ancestors() {
3288 if block.round() > 1 {
3289 assert!(
3291 last_round_blocks
3292 .iter()
3293 .any(|block| block.reference() == *ancestor),
3294 "Reference from previous round should be added"
3295 );
3296 }
3297 }
3298 }
3299
3300 last_round_blocks = this_round_blocks;
3301 }
3302
3303 for core_fixture in cores {
3304 core_fixture.dag_state.write().flush();
3306
3307 let last_commit = core_fixture
3309 .store
3310 .read_last_commit()
3311 .unwrap()
3312 .expect("last commit should be set");
3313 assert_eq!(last_commit.index(), 27);
3317 let all_stored_commits = core_fixture
3318 .store
3319 .scan_commits((0..=CommitIndex::MAX).into())
3320 .unwrap();
3321 assert_eq!(all_stored_commits.len(), 27);
3322 assert_eq!(
3323 core_fixture
3324 .core
3325 .leader_schedule
3326 .leader_swap_table
3327 .read()
3328 .bad_nodes
3329 .len(),
3330 1
3331 );
3332 assert_eq!(
3333 core_fixture
3334 .core
3335 .leader_schedule
3336 .leader_swap_table
3337 .read()
3338 .good_nodes
3339 .len(),
3340 1
3341 );
3342 let expected_reputation_scores =
3343 ReputationScores::new((11..=20).into(), vec![29, 29, 29, 29]);
3344 assert_eq!(
3345 core_fixture
3346 .core
3347 .leader_schedule
3348 .leader_swap_table
3349 .read()
3350 .reputation_scores,
3351 expected_reputation_scores
3352 );
3353 }
3354 }
3355
3356 #[tokio::test]
3357 async fn test_filter_new_commits() {
3358 telemetry_subscribers::init_for_testing();
3359
3360 let (context, _key_pairs) = Context::new_for_test(4);
3361 let context = context.with_parameters(Parameters {
3362 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3363 ..Default::default()
3364 });
3365
3366 let authority_index = AuthorityIndex::new_for_test(0);
3367 let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true).await;
3368 let mut core = core.core;
3369
3370 assert_eq!(
3372 core.last_proposed_round(),
3373 GENESIS_ROUND,
3374 "No block should have been created other than genesis"
3375 );
3376
3377 let mut dag_builder = DagBuilder::new(core.context.clone());
3379 dag_builder.layers(1..=12).build();
3380
3381 dag_builder.print();
3383 let blocks = dag_builder.blocks(1..=6);
3384
3385 for block in blocks {
3386 core.dag_state.write().accept_block(block);
3387 }
3388
3389 let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
3391
3392 let committed_sub_dags = core.try_commit(vec![]).unwrap();
3394
3395 assert_eq!(committed_sub_dags.len(), 4);
3397
3398 println!("Case 1. Provide certified commits that are all before the last committed round.");
3400
3401 let certified_commits = sub_dags_and_commits
3403 .iter()
3404 .take(4)
3405 .map(|(_, c)| c)
3406 .cloned()
3407 .collect::<Vec<_>>();
3408 assert!(
3409 certified_commits.last().unwrap().index()
3410 <= committed_sub_dags.last().unwrap().commit_ref.index,
3411 "Highest certified commit should older than the highest committed index."
3412 );
3413
3414 let certified_commits = core.filter_new_commits(certified_commits).unwrap();
3415
3416 assert!(certified_commits.is_empty());
3418
3419 println!("Case 2. Provide certified commits that are all after the last committed round.");
3420
3421 let certified_commits = sub_dags_and_commits
3423 .iter()
3424 .take(5)
3425 .map(|(_, c)| c.clone())
3426 .collect::<Vec<_>>();
3427
3428 let certified_commits = core.filter_new_commits(certified_commits.clone()).unwrap();
3429
3430 assert_eq!(certified_commits.len(), 1);
3432 assert_eq!(certified_commits.first().unwrap().reference().index, 5);
3433
3434 println!(
3435 "Case 3. Provide certified commits where the first certified commit index is not the last_commited_index + 1."
3436 );
3437
3438 let certified_commits = sub_dags_and_commits
3440 .iter()
3441 .skip(5)
3442 .take(1)
3443 .map(|(_, c)| c.clone())
3444 .collect::<Vec<_>>();
3445
3446 let err = core
3447 .filter_new_commits(certified_commits.clone())
3448 .unwrap_err();
3449 match err {
3450 ConsensusError::UnexpectedCertifiedCommitIndex {
3451 expected_commit_index: 5,
3452 commit_index: 6,
3453 } => (),
3454 _ => panic!("Unexpected error: {:?}", err),
3455 }
3456 }
3457
3458 #[tokio::test]
3459 async fn test_add_certified_commits() {
3460 telemetry_subscribers::init_for_testing();
3461
3462 let (context, _key_pairs) = Context::new_for_test(4);
3463 let context = context.with_parameters(Parameters {
3464 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3465 ..Default::default()
3466 });
3467
3468 let authority_index = AuthorityIndex::new_for_test(0);
3469 let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true).await;
3470 let store = core.store.clone();
3471 let mut core = core.core;
3472
3473 assert_eq!(
3475 core.last_proposed_round(),
3476 GENESIS_ROUND,
3477 "No block should have been created other than genesis"
3478 );
3479
3480 let mut dag_builder = DagBuilder::new(core.context.clone());
3482 dag_builder.layers(1..=12).build();
3483
3484 dag_builder.print();
3486 let blocks = dag_builder.blocks(1..=6);
3487
3488 for block in blocks {
3489 core.dag_state.write().accept_block(block);
3490 }
3491
3492 let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
3494
3495 let committed_sub_dags = core.try_commit(vec![]).unwrap();
3497
3498 assert_eq!(committed_sub_dags.len(), 4);
3500
3501 core.dag_state.write().flush();
3503
3504 println!("Case 1. Provide no certified commits. No commit should happen.");
3505
3506 let last_commit = store
3507 .read_last_commit()
3508 .unwrap()
3509 .expect("Last commit should be set");
3510 assert_eq!(last_commit.reference().index, 4);
3511
3512 println!(
3513 "Case 2. Provide certified commits that before and after the last committed round and also there are additional blocks so can run the direct decide rule as well."
3514 );
3515
3516 let certified_commits = sub_dags_and_commits
3518 .iter()
3519 .skip(3)
3520 .take(5)
3521 .map(|(_, c)| c.clone())
3522 .collect::<Vec<_>>();
3523
3524 let blocks = dag_builder.blocks(8..=12);
3526 for block in blocks {
3527 core.dag_state.write().accept_block(block);
3528 }
3529
3530 core.add_certified_commits(CertifiedCommits::new(certified_commits.clone(), vec![]))
3532 .expect("Should not fail");
3533
3534 core.dag_state.write().flush();
3536
3537 let commits = store.scan_commits((6..=10).into()).unwrap();
3538
3539 assert_eq!(commits.len(), 5);
3541
3542 for i in 6..=10 {
3543 let commit = &commits[i - 6];
3544 assert_eq!(commit.reference().index, i as u32);
3545 }
3546 }
3547
3548 #[tokio::test]
3549 async fn try_commit_with_certified_commits_gced_blocks() {
3550 const GC_DEPTH: u32 = 3;
3551 telemetry_subscribers::init_for_testing();
3552
3553 let (mut context, mut key_pairs) = Context::new_for_test(5);
3554 context
3555 .protocol_config
3556 .set_consensus_gc_depth_for_testing(GC_DEPTH);
3557 let context = Arc::new(context.with_parameters(Parameters {
3558 sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3559 ..Default::default()
3560 }));
3561
3562 let store = Arc::new(MemStore::new());
3563 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3564
3565 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
3566 let leader_schedule = Arc::new(
3567 LeaderSchedule::from_store(context.clone(), dag_state.clone())
3568 .with_num_commits_per_schedule(10),
3569 );
3570
3571 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3572 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3573 let (signals, signal_receivers) = CoreSignals::new(context.clone());
3574 let (blocks_sender, _blocks_receiver) =
3575 monitored_mpsc::unbounded_channel("consensus_block_output");
3576 let transaction_certifier =
3577 TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
3578 let _block_receiver = signal_receivers.block_broadcast_receiver();
3580
3581 let (commit_consumer, _commit_receiver, _transaction_receiver) =
3582 CommitConsumerArgs::new(0, 0);
3583 let commit_observer = CommitObserver::new(
3584 context.clone(),
3585 commit_consumer,
3586 dag_state.clone(),
3587 transaction_certifier.clone(),
3588 leader_schedule.clone(),
3589 )
3590 .await;
3591
3592 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
3593 let mut core = Core::new(
3594 context.clone(),
3595 leader_schedule,
3596 transaction_consumer,
3597 transaction_certifier.clone(),
3598 block_manager,
3599 true,
3600 commit_observer,
3601 signals,
3602 key_pairs.remove(context.own_index.value()).1,
3603 dag_state.clone(),
3604 true,
3605 round_tracker,
3606 );
3607
3608 assert_eq!(
3610 core.last_proposed_round(),
3611 GENESIS_ROUND,
3612 "No block should have been created other than genesis"
3613 );
3614
3615 let dag_str = "DAG {
3616 Round 0 : { 5 },
3617 Round 1 : { * },
3618 Round 2 : {
3619 A -> [-E1],
3620 B -> [-E1],
3621 C -> [-E1],
3622 D -> [-E1],
3623 },
3624 Round 3 : {
3625 A -> [*],
3626 B -> [*],
3627 C -> [*],
3628 D -> [*],
3629 },
3630 Round 4 : {
3631 A -> [*],
3632 B -> [*],
3633 C -> [*],
3634 D -> [*],
3635 },
3636 Round 5 : {
3637 A -> [*],
3638 B -> [*],
3639 C -> [*],
3640 D -> [*],
3641 E -> [A4, B4, C4, D4, E1]
3642 },
3643 Round 6 : { * },
3644 Round 7 : { * },
3645 }";
3646
3647 let (_, mut dag_builder) = parse_dag(dag_str).expect("Invalid dag");
3648 dag_builder.print();
3649
3650 let (_sub_dags, certified_commits): (Vec<_>, Vec<_>) = dag_builder
3652 .get_sub_dag_and_certified_commits(1..=5)
3653 .into_iter()
3654 .unzip();
3655
3656 let committed_sub_dags = core.try_commit(certified_commits).unwrap();
3659
3660 assert_eq!(committed_sub_dags.len(), 4);
3662 for (index, committed_sub_dag) in committed_sub_dags.iter().enumerate() {
3663 assert_eq!(committed_sub_dag.commit_ref.index as usize, index + 1);
3664
3665 for block in committed_sub_dag.blocks.iter() {
3667 if block.round() == 1 && block.author() == AuthorityIndex::new_for_test(5) {
3668 panic!("Did not expect to commit block E1");
3669 }
3670 }
3671 }
3672 }
3673
3674 #[tokio::test(flavor = "current_thread", start_paused = true)]
3675 async fn test_commit_on_leader_schedule_change_boundary_without_multileader() {
3676 parameterized_test_commit_on_leader_schedule_change_boundary(Some(1)).await;
3677 }
3678
3679 #[tokio::test(flavor = "current_thread", start_paused = true)]
3680 async fn test_commit_on_leader_schedule_change_boundary_with_multileader() {
3681 parameterized_test_commit_on_leader_schedule_change_boundary(None).await;
3682 }
3683
3684 async fn parameterized_test_commit_on_leader_schedule_change_boundary(
3685 num_leaders_per_round: Option<usize>,
3686 ) {
3687 telemetry_subscribers::init_for_testing();
3688 let default_params = Parameters::default();
3689
3690 let (mut context, _) = Context::new_for_test(6);
3691 context
3692 .protocol_config
3693 .set_mysticeti_num_leaders_per_round_for_testing(num_leaders_per_round);
3694 let mut cores = create_cores(context, vec![1, 1, 1, 1, 1, 1]).await;
3696
3697 let mut last_round_blocks: Vec<VerifiedBlock> = Vec::new();
3699 for round in 1..=33 {
3700 let mut this_round_blocks = Vec::new();
3701
3702 sleep(default_params.min_round_delay).await;
3704
3705 for core_fixture in &mut cores {
3706 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
3709
3710 core_fixture.core.round_tracker.write().update_from_probe(
3711 vec![
3712 vec![round, round, round, round, round, round],
3713 vec![round, round, round, round, round, round],
3714 vec![round, round, round, round, round, round],
3715 vec![round, round, round, round, round, round],
3716 vec![round, round, round, round, round, round],
3717 vec![round, round, round, round, round, round],
3718 ],
3719 vec![
3720 vec![round, round, round, round, round, round],
3721 vec![round, round, round, round, round, round],
3722 vec![round, round, round, round, round, round],
3723 vec![round, round, round, round, round, round],
3724 vec![round, round, round, round, round, round],
3725 vec![round, round, round, round, round, round],
3726 ],
3727 );
3728
3729 let new_round = receive(
3731 Duration::from_secs(1),
3732 core_fixture.signal_receivers.new_round_receiver(),
3733 )
3734 .await;
3735 assert_eq!(new_round, round);
3736
3737 let extended_block = tokio::time::timeout(
3739 Duration::from_secs(1),
3740 core_fixture.block_receiver.recv(),
3741 )
3742 .await
3743 .unwrap()
3744 .unwrap();
3745 assert_eq!(extended_block.block.round(), round);
3746 assert_eq!(
3747 extended_block.block.author(),
3748 core_fixture.core.context.own_index
3749 );
3750
3751 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3753
3754 let block = core_fixture.core.last_proposed_block();
3755
3756 assert_eq!(
3758 block.ancestors().len(),
3759 core_fixture.core.context.committee.size()
3760 );
3761 for ancestor in block.ancestors() {
3762 if block.round() > 1 {
3763 assert!(
3765 last_round_blocks
3766 .iter()
3767 .any(|block| block.reference() == *ancestor),
3768 "Reference from previous round should be added"
3769 );
3770 }
3771 }
3772 }
3773
3774 last_round_blocks = this_round_blocks;
3775 }
3776
3777 for core_fixture in cores {
3778 let expected_commit_count = match num_leaders_per_round {
3790 Some(1) => 30,
3791 _ => 31,
3792 };
3793
3794 core_fixture.dag_state.write().flush();
3796
3797 let last_commit = core_fixture
3799 .store
3800 .read_last_commit()
3801 .unwrap()
3802 .expect("last commit should be set");
3803 assert_eq!(last_commit.index(), expected_commit_count);
3804 let all_stored_commits = core_fixture
3805 .store
3806 .scan_commits((0..=CommitIndex::MAX).into())
3807 .unwrap();
3808 assert_eq!(all_stored_commits.len(), expected_commit_count as usize);
3809 assert_eq!(
3810 core_fixture
3811 .core
3812 .leader_schedule
3813 .leader_swap_table
3814 .read()
3815 .bad_nodes
3816 .len(),
3817 1
3818 );
3819 assert_eq!(
3820 core_fixture
3821 .core
3822 .leader_schedule
3823 .leader_swap_table
3824 .read()
3825 .good_nodes
3826 .len(),
3827 1
3828 );
3829 let expected_reputation_scores =
3830 ReputationScores::new((21..=30).into(), vec![43, 43, 43, 43, 43, 43]);
3831 assert_eq!(
3832 core_fixture
3833 .core
3834 .leader_schedule
3835 .leader_swap_table
3836 .read()
3837 .reputation_scores,
3838 expected_reputation_scores
3839 );
3840 }
3841 }
3842
3843 #[tokio::test]
3844 async fn test_core_signals() {
3845 telemetry_subscribers::init_for_testing();
3846 let default_params = Parameters::default();
3847
3848 let (context, _) = Context::new_for_test(4);
3849 let mut cores = create_cores(context, vec![1, 1, 1, 1]).await;
3851
3852 let mut last_round_blocks = Vec::new();
3854 for round in 1..=10 {
3855 let mut this_round_blocks = Vec::new();
3856
3857 sleep(default_params.min_round_delay).await;
3859
3860 for core_fixture in &mut cores {
3861 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
3864
3865 core_fixture.core.round_tracker.write().update_from_probe(
3866 vec![
3867 vec![round, round, round, round],
3868 vec![round, round, round, round],
3869 vec![round, round, round, round],
3870 vec![round, round, round, round],
3871 ],
3872 vec![
3873 vec![round, round, round, round],
3874 vec![round, round, round, round],
3875 vec![round, round, round, round],
3876 vec![round, round, round, round],
3877 ],
3878 );
3879
3880 let new_round = receive(
3882 Duration::from_secs(1),
3883 core_fixture.signal_receivers.new_round_receiver(),
3884 )
3885 .await;
3886 assert_eq!(new_round, round);
3887
3888 let extended_block = tokio::time::timeout(
3890 Duration::from_secs(1),
3891 core_fixture.block_receiver.recv(),
3892 )
3893 .await
3894 .unwrap()
3895 .unwrap();
3896 assert_eq!(extended_block.block.round(), round);
3897 assert_eq!(
3898 extended_block.block.author(),
3899 core_fixture.core.context.own_index
3900 );
3901
3902 this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3904
3905 let block = core_fixture.core.last_proposed_block();
3906
3907 assert_eq!(
3909 block.ancestors().len(),
3910 core_fixture.core.context.committee.size()
3911 );
3912 for ancestor in block.ancestors() {
3913 if block.round() > 1 {
3914 assert!(
3916 last_round_blocks
3917 .iter()
3918 .any(|block| block.reference() == *ancestor),
3919 "Reference from previous round should be added"
3920 );
3921 }
3922 }
3923 }
3924
3925 last_round_blocks = this_round_blocks;
3926 }
3927
3928 for core_fixture in cores {
3929 core_fixture.dag_state.write().flush();
3931 let last_commit = core_fixture
3933 .store
3934 .read_last_commit()
3935 .unwrap()
3936 .expect("last commit should be set");
3937 assert_eq!(last_commit.index(), 7);
3941 let all_stored_commits = core_fixture
3942 .store
3943 .scan_commits((0..=CommitIndex::MAX).into())
3944 .unwrap();
3945 assert_eq!(all_stored_commits.len(), 7);
3946 }
3947 }
3948
3949 #[tokio::test]
3950 async fn test_core_compress_proposal_references() {
3951 telemetry_subscribers::init_for_testing();
3952 let default_params = Parameters::default();
3953
3954 let (context, _) = Context::new_for_test(4);
3955 let mut cores = create_cores(context, vec![1, 1, 1, 1]).await;
3957
3958 let mut last_round_blocks = Vec::new();
3959 let mut all_blocks = Vec::new();
3960
3961 let excluded_authority = AuthorityIndex::new_for_test(3);
3962
3963 for round in 1..=10 {
3964 let mut this_round_blocks = Vec::new();
3965
3966 for core_fixture in &mut cores {
3967 if core_fixture.core.context.own_index == excluded_authority {
3969 continue;
3970 }
3971
3972 core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
3974 core_fixture.core.round_tracker.write().update_from_probe(
3975 vec![
3976 vec![round, round, round, round],
3977 vec![round, round, round, round],
3978 vec![round, round, round, round],
3979 vec![round, round, round, round],
3980 ],
3981 vec![
3982 vec![round, round, round, round],
3983 vec![round, round, round, round],
3984 vec![round, round, round, round],
3985 vec![round, round, round, round],
3986 ],
3987 );
3988 core_fixture.core.new_block(round, true).unwrap();
3989
3990 let block = core_fixture.core.last_proposed_block();
3991 assert_eq!(block.round(), round);
3992
3993 this_round_blocks.push(block.clone());
3995 }
3996
3997 last_round_blocks = this_round_blocks.clone();
3998 all_blocks.extend(this_round_blocks);
3999 }
4000
4001 let core_fixture = &mut cores[excluded_authority];
4005 sleep(default_params.min_round_delay).await;
4007 core_fixture.add_blocks(all_blocks).unwrap();
4009
4010 let block = core_fixture.core.last_proposed_block();
4013 assert_eq!(block.round(), 11);
4014 assert_eq!(block.ancestors().len(), 4);
4015 for block_ref in block.ancestors() {
4016 if block_ref.author == excluded_authority {
4017 assert_eq!(block_ref.round, 1);
4018 } else {
4019 assert_eq!(block_ref.round, 10);
4020 }
4021 }
4022
4023 core_fixture.dag_state.write().flush();
4025
4026 let last_commit = core_fixture
4028 .store
4029 .read_last_commit()
4030 .unwrap()
4031 .expect("last commit should be set");
4032 assert_eq!(last_commit.index(), 6);
4036 let all_stored_commits = core_fixture
4037 .store
4038 .scan_commits((0..=CommitIndex::MAX).into())
4039 .unwrap();
4040 assert_eq!(all_stored_commits.len(), 6);
4041 }
4042
4043 #[tokio::test]
4044 async fn try_select_certified_leaders() {
4045 telemetry_subscribers::init_for_testing();
4047
4048 let (context, _) = Context::new_for_test(4);
4049
4050 let authority_index = AuthorityIndex::new_for_test(0);
4051 let core =
4052 CoreTextFixture::new(context.clone(), vec![1, 1, 1, 1], authority_index, true).await;
4053 let mut core = core.core;
4054
4055 let mut dag_builder = DagBuilder::new(Arc::new(context.clone()));
4056 dag_builder.layers(1..=12).build();
4057
4058 let limit = 2;
4059
4060 let blocks = dag_builder.blocks(1..=12);
4061
4062 for block in blocks {
4063 core.dag_state.write().accept_block(block);
4064 }
4065
4066 let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=4);
4068 let mut certified_commits = sub_dags_and_commits
4069 .into_iter()
4070 .map(|(_, commit)| commit)
4071 .collect::<Vec<_>>();
4072
4073 let leaders = core.try_select_certified_leaders(&mut certified_commits, limit);
4074
4075 assert_eq!(leaders.len(), 2);
4077 assert_eq!(certified_commits.len(), 2);
4078 }
4079
4080 pub(crate) async fn receive<T: Copy>(timeout: Duration, mut receiver: watch::Receiver<T>) -> T {
4081 tokio::time::timeout(timeout, receiver.changed())
4082 .await
4083 .expect("Timeout while waiting to read from receiver")
4084 .expect("Signal receive channel shouldn't be closed");
4085 *receiver.borrow_and_update()
4086 }
4087}