1use std::{
5 collections::{BTreeMap, BTreeSet, VecDeque},
6 sync::Arc,
7};
8
9use consensus_config::Stake;
10use consensus_types::block::{BlockRef, Round, TransactionIndex};
11use mysten_metrics::{
12 monitored_mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
13 monitored_scope, spawn_logged_monitored_task,
14};
15use parking_lot::RwLock;
16
17use crate::{
18 BlockAPI, CommitIndex, CommittedSubDag, VerifiedBlock,
19 commit::DEFAULT_WAVE_LENGTH,
20 context::Context,
21 dag_state::DagState,
22 error::{ConsensusError, ConsensusResult},
23 stake_aggregator::{QuorumThreshold, StakeAggregator},
24 transaction_certifier::TransactionCertifier,
25};
26
27pub(crate) const INDIRECT_REJECT_DEPTH: Round = 3;
31
32pub(crate) struct CommitFinalizerHandle {
34 sender: UnboundedSender<CommittedSubDag>,
35}
36
37impl CommitFinalizerHandle {
38 pub(crate) fn send(&self, commit: CommittedSubDag) -> ConsensusResult<()> {
40 self.sender.send(commit).map_err(|e| {
41 tracing::warn!("Failed to send to commit finalizer, probably due to shutdown: {e:?}");
42 ConsensusError::Shutdown
43 })
44 }
45}
46
47pub struct CommitFinalizer {
69 context: Arc<Context>,
70 dag_state: Arc<RwLock<DagState>>,
71 transaction_certifier: TransactionCertifier,
72 commit_sender: UnboundedSender<CommittedSubDag>,
73
74 last_processed_commit: Option<CommitIndex>,
76 pending_commits: VecDeque<CommitState>,
78 blocks: Arc<RwLock<BTreeMap<BlockRef, RwLock<BlockState>>>>,
80}
81
82impl CommitFinalizer {
83 pub fn new(
84 context: Arc<Context>,
85 dag_state: Arc<RwLock<DagState>>,
86 transaction_certifier: TransactionCertifier,
87 commit_sender: UnboundedSender<CommittedSubDag>,
88 ) -> Self {
89 Self {
90 context,
91 dag_state,
92 transaction_certifier,
93 commit_sender,
94 last_processed_commit: None,
95 pending_commits: VecDeque::new(),
96 blocks: Arc::new(RwLock::new(BTreeMap::new())),
97 }
98 }
99
100 pub(crate) fn start(
101 context: Arc<Context>,
102 dag_state: Arc<RwLock<DagState>>,
103 transaction_certifier: TransactionCertifier,
104 commit_sender: UnboundedSender<CommittedSubDag>,
105 ) -> CommitFinalizerHandle {
106 let processor = Self::new(context, dag_state, transaction_certifier, commit_sender);
107 let (sender, receiver) = unbounded_channel("consensus_commit_finalizer");
108 let _handle =
109 spawn_logged_monitored_task!(processor.run(receiver), "consensus_commit_finalizer");
110 CommitFinalizerHandle { sender }
111 }
112
113 async fn run(mut self, mut receiver: UnboundedReceiver<CommittedSubDag>) {
114 while let Some(committed_sub_dag) = receiver.recv().await {
115 let already_finalized = !self.context.protocol_config.mysticeti_fastpath()
116 || committed_sub_dag.recovered_rejected_transactions;
117 let finalized_commits = if !already_finalized {
118 self.process_commit(committed_sub_dag).await
119 } else {
120 vec![committed_sub_dag]
121 };
122 if !finalized_commits.is_empty() {
123 self.try_update_gc_round(finalized_commits.last().unwrap().leader.round);
127 let mut dag_state = self.dag_state.write();
128 if !already_finalized {
129 for commit in &finalized_commits {
131 dag_state.add_finalized_commit(
132 commit.commit_ref,
133 commit.rejected_transactions_by_block.clone(),
134 );
135 }
136 }
137 dag_state.flush();
142 }
143 for commit in finalized_commits {
144 if let Err(e) = self.commit_sender.send(commit) {
145 tracing::warn!(
146 "Failed to send to commit handler, probably due to shutdown: {e:?}"
147 );
148 return;
149 }
150 }
151 }
152 }
153
154 pub async fn process_commit(
155 &mut self,
156 committed_sub_dag: CommittedSubDag,
157 ) -> Vec<CommittedSubDag> {
158 let _scope = monitored_scope("CommitFinalizer::process_commit");
159
160 if let Some(last_processed_commit) = self.last_processed_commit {
161 assert_eq!(
162 last_processed_commit + 1,
163 committed_sub_dag.commit_ref.index
164 );
165 }
166 self.last_processed_commit = Some(committed_sub_dag.commit_ref.index);
167
168 self.pending_commits
169 .push_back(CommitState::new(committed_sub_dag));
170
171 let mut finalized_commits = vec![];
172
173 for i in 0..self.pending_commits.len() {
192 let commit_state = &self.pending_commits[i];
193 if commit_state.pending_blocks.is_empty() {
194 continue;
196 }
197 if !commit_state.commit.decided_with_local_blocks {
202 let last_commit_state = self.pending_commits.back().unwrap();
203 if commit_state.commit.leader.round + DEFAULT_WAVE_LENGTH
204 > last_commit_state.commit.leader.round
205 {
206 break;
207 }
208 }
209 self.try_direct_finalize_commit(i);
210 }
211 let direct_finalized_commits = self.pop_finalized_commits();
212 self.context
213 .metrics
214 .node_metrics
215 .finalizer_output_commits
216 .with_label_values(&["direct"])
217 .inc_by(direct_finalized_commits.len() as u64);
218 finalized_commits.extend(direct_finalized_commits);
219
220 if !self.pending_commits.is_empty() {
223 self.link_blocks_in_last_commit();
230 self.append_origin_descendants_from_last_commit();
231 while self.pending_commits.len() > 1 {
235 if !self.pending_commits[0].pending_blocks.is_empty() {
238 break;
239 }
240 self.try_indirect_finalize_first_commit().await;
242 let indirect_finalized_commits = self.pop_finalized_commits();
243 if indirect_finalized_commits.is_empty() {
244 break;
246 }
247 self.context
248 .metrics
249 .node_metrics
250 .finalizer_output_commits
251 .with_label_values(&["indirect"])
252 .inc_by(indirect_finalized_commits.len() as u64);
253 finalized_commits.extend(indirect_finalized_commits);
254 }
255 }
256
257 self.context
258 .metrics
259 .node_metrics
260 .finalizer_buffered_commits
261 .set(self.pending_commits.len() as i64);
262
263 finalized_commits
264 }
265
266 fn try_direct_finalize_commit(&mut self, index: usize) {
275 let metrics = &self.context.metrics.node_metrics;
276 let num_commits = self.pending_commits.len();
277 let commit_state = self
278 .pending_commits
279 .get_mut(index)
280 .unwrap_or_else(|| panic!("Commit {} does not exist. len = {}", index, num_commits));
281
282 let vote_gc_round = self
298 .dag_state
299 .read()
300 .calculate_gc_round(commit_state.commit.leader.round + INDIRECT_REJECT_DEPTH);
301 tracing::debug!(
302 "Trying to direct finalize commit {} using vote GC round {}",
303 commit_state.commit.commit_ref,
304 vote_gc_round,
305 );
306
307 assert!(!commit_state.pending_blocks.is_empty());
309 let pending_blocks = std::mem::take(&mut commit_state.pending_blocks);
310
311 for (block_ref, num_transactions) in pending_blocks {
312 if block_ref.round <= vote_gc_round && num_transactions > 0 {
313 let transactions =
315 (0..(num_transactions as TransactionIndex)).collect::<BTreeSet<_>>();
316 commit_state
317 .pending_transactions
318 .entry(block_ref)
319 .or_default()
320 .extend(transactions);
321 let hostname = &self.context.committee.authority(block_ref.author).hostname;
322 metrics
323 .finalizer_skipped_voting_blocks
324 .with_label_values(&[hostname.as_str(), "direct"])
325 .inc();
326 tracing::debug!(
327 "Block {} is potentially outside of GC bound from its leader {} in commit {}. Skipping direct finalization.",
328 block_ref,
329 commit_state.commit.leader,
330 commit_state.commit.commit_ref
331 );
332 continue;
333 }
334 let reject_votes = self.transaction_certifier.get_reject_votes(&block_ref)
335 .unwrap_or_else(|| panic!("No vote info found for {block_ref}. It is either incorrectly gc'ed or failed to be recovered after crash."));
336 metrics
337 .finalizer_transaction_status
338 .with_label_values(&["direct_finalize"])
339 .inc_by((num_transactions - reject_votes.len()) as u64);
340 let hostname = &self.context.committee.authority(block_ref.author).hostname;
341 metrics
342 .finalizer_reject_votes
343 .with_label_values(&[hostname])
344 .inc_by(reject_votes.len() as u64);
345 for (transaction_index, stake) in reject_votes {
348 let entry = if stake < self.context.committee.quorum_threshold() {
351 commit_state
352 .pending_transactions
353 .entry(block_ref)
354 .or_default()
355 } else {
356 metrics
357 .finalizer_transaction_status
358 .with_label_values(&["direct_reject"])
359 .inc();
360 commit_state
361 .rejected_transactions
362 .entry(block_ref)
363 .or_default()
364 };
365 entry.insert(transaction_index);
366 }
367 }
368 }
369
370 fn link_blocks_in_last_commit(&mut self) {
373 let commit_state = self
374 .pending_commits
375 .back_mut()
376 .unwrap_or_else(|| panic!("No pending commit."));
377
378 let mut blocks = commit_state.commit.blocks.clone();
381 blocks.sort_by_key(|b| b.round());
382
383 let mut blocks_map = self.blocks.write();
384 for block in blocks {
385 let block_ref = block.reference();
386 for ancestor in block.ancestors() {
388 if let Some(ancestor_block) = blocks_map.get(ancestor) {
391 ancestor_block.write().children.insert(block_ref);
392 }
393 }
394 blocks_map.entry(block_ref).or_insert_with(|| {
396 RwLock::new(BlockState::new(block, commit_state.commit.commit_ref.index))
397 });
398 }
399 }
400
401 fn append_origin_descendants_from_last_commit(&mut self) {
422 let commit_state = self
423 .pending_commits
424 .back_mut()
425 .unwrap_or_else(|| panic!("No pending commit."));
426 let mut committed_blocks = commit_state.commit.blocks.clone();
427 committed_blocks.sort_by_key(|b| b.round());
428 let blocks_map = self.blocks.read();
429 for committed_block in committed_blocks {
430 let committed_block_ref = committed_block.reference();
431 let mut origin_ancestor_ref = *blocks_map
435 .get(&committed_block_ref)
436 .unwrap()
437 .read()
438 .block
439 .ancestors()
440 .first()
441 .unwrap();
442 while origin_ancestor_ref.author == committed_block_ref.author {
443 let Some(origin_ancestor_block) = blocks_map.get(&origin_ancestor_ref) else {
444 break;
445 };
446 origin_ancestor_block
447 .write()
448 .origin_descendants
449 .push(committed_block_ref);
450 origin_ancestor_ref = *origin_ancestor_block
451 .read()
452 .block
453 .ancestors()
454 .first()
455 .unwrap();
456 }
457 }
458 }
459
460 async fn try_indirect_finalize_first_commit(&mut self) {
462 assert!(!self.pending_commits.is_empty());
464 assert!(self.pending_commits[0].pending_blocks.is_empty());
465
466 self.check_pending_transactions_in_first_commit();
468
469 self.try_indirect_finalize_pending_transactions_in_first_commit()
471 .await;
472
473 self.try_indirect_reject_pending_transactions_in_first_commit();
475 }
476
477 fn check_pending_transactions_in_first_commit(&mut self) {
478 let mut all_rejected_transactions: Vec<(BlockRef, Vec<TransactionIndex>)> = vec![];
479
480 for (block_ref, pending_transactions) in &self.pending_commits[0].pending_transactions {
482 let reject_votes: BTreeMap<TransactionIndex, Stake> = self
483 .transaction_certifier
484 .get_reject_votes(block_ref)
485 .unwrap_or_else(|| panic!("No vote info found for {block_ref}. It is incorrectly gc'ed or failed to be recovered after crash."))
486 .into_iter()
487 .collect();
488 let mut rejected_transactions = vec![];
489 for &transaction_index in pending_transactions {
490 let reject_stake = reject_votes
492 .get(&transaction_index)
493 .copied()
494 .unwrap_or_default();
495 if reject_stake < self.context.committee.quorum_threshold() {
496 continue;
498 }
499 rejected_transactions.push(transaction_index);
501 }
502 if !rejected_transactions.is_empty() {
503 all_rejected_transactions.push((*block_ref, rejected_transactions));
504 }
505 }
506
507 for (block_ref, rejected_transactions) in all_rejected_transactions {
509 self.context
510 .metrics
511 .node_metrics
512 .finalizer_transaction_status
513 .with_label_values(&["direct_late_reject"])
514 .inc_by(rejected_transactions.len() as u64);
515 let curr_commit_state = &mut self.pending_commits[0];
516 curr_commit_state.remove_pending_transactions(&block_ref, &rejected_transactions);
517 curr_commit_state
518 .rejected_transactions
519 .entry(block_ref)
520 .or_default()
521 .extend(rejected_transactions);
522 }
523 }
524
525 async fn try_indirect_finalize_pending_transactions_in_first_commit(&mut self) {
526 tracing::debug!(
527 "Trying to indirectly finalize pending transactions in first commit {}",
528 self.pending_commits[0].commit.commit_ref,
529 );
530 let _scope = monitored_scope(
531 "CommitFinalizer::try_indirect_finalize_pending_transactions_in_first_commit",
532 );
533
534 let pending_blocks: Vec<_> = self.pending_commits[0]
535 .pending_transactions
536 .iter()
537 .map(|(k, v)| (*k, v.clone()))
538 .collect();
539
540 let gc_rounds = self
541 .pending_commits
542 .iter()
543 .map(|c| {
544 (
545 c.commit.commit_ref.index,
546 self.dag_state
547 .read()
548 .calculate_gc_round(c.commit.leader.round),
549 )
550 })
551 .collect::<Vec<_>>();
552
553 const BLOCKS_PER_INDIRECT_COMMIT_TASK: usize = 8;
555
556 let mut all_finalized_transactions = vec![];
558 let mut handles = Vec::new();
559 for chunk in pending_blocks.chunks(BLOCKS_PER_INDIRECT_COMMIT_TASK) {
562 let context = self.context.clone();
563 let blocks = self.blocks.clone();
564 let gc_rounds = gc_rounds.clone();
565 let chunk: Vec<(BlockRef, BTreeSet<TransactionIndex>)> = chunk.to_vec();
566
567 let handle = tokio::task::spawn_blocking(move || {
568 let mut chunk_results = Vec::new();
569
570 for (block_ref, pending_transactions) in chunk {
571 let finalized = Self::try_indirect_finalize_pending_transactions_in_block(
572 &context,
573 &blocks,
574 &gc_rounds,
575 block_ref,
576 pending_transactions,
577 );
578
579 if !finalized.is_empty() {
580 chunk_results.push((block_ref, finalized));
581 }
582 }
583
584 chunk_results
585 });
586
587 handles.push(handle);
588 }
589
590 for handle in handles {
592 let result = match handle.await {
593 Ok(chunk_results) => {
594 all_finalized_transactions.extend(chunk_results);
595 continue;
596 }
597 Err(e) => e,
598 };
599 if result.is_panic() {
600 std::panic::resume_unwind(result.into_panic());
601 }
602 tracing::info!("Process likely shutting down: {:?}", result);
603 return;
605 }
606
607 for (block_ref, finalized_transactions) in all_finalized_transactions {
608 self.context
609 .metrics
610 .node_metrics
611 .finalizer_transaction_status
612 .with_label_values(&["indirect_finalize"])
613 .inc_by(finalized_transactions.len() as u64);
614 self.pending_commits[0]
616 .remove_pending_transactions(&block_ref, &finalized_transactions);
617 }
618 }
619
620 fn try_indirect_reject_pending_transactions_in_first_commit(&mut self) {
621 let curr_leader_round = self.pending_commits[0].commit.leader.round;
622 let last_commit_leader_round = self.pending_commits.back().unwrap().commit.leader.round;
623 if curr_leader_round + INDIRECT_REJECT_DEPTH <= last_commit_leader_round {
624 let curr_commit_state = &mut self.pending_commits[0];
625 assert!(curr_commit_state.pending_blocks.is_empty());
629 let pending_transactions = std::mem::take(&mut curr_commit_state.pending_transactions);
633 for (block_ref, pending_transactions) in pending_transactions {
634 self.context
635 .metrics
636 .node_metrics
637 .finalizer_transaction_status
638 .with_label_values(&["indirect_reject"])
639 .inc_by(pending_transactions.len() as u64);
640 curr_commit_state
641 .rejected_transactions
642 .entry(block_ref)
643 .or_default()
644 .extend(pending_transactions);
645 }
646 }
647 }
648
649 fn try_indirect_finalize_pending_transactions_in_block(
653 context: &Arc<Context>,
654 blocks: &Arc<RwLock<BTreeMap<BlockRef, RwLock<BlockState>>>>,
655 gc_rounds: &[(CommitIndex, Round)],
656 pending_block_ref: BlockRef,
657 pending_transactions: BTreeSet<TransactionIndex>,
658 ) -> Vec<TransactionIndex> {
659 if pending_transactions.is_empty() {
660 return vec![];
661 }
662 let mut accept_votes: BTreeMap<TransactionIndex, StakeAggregator<QuorumThreshold>> =
663 pending_transactions
664 .into_iter()
665 .map(|transaction_index| (transaction_index, StakeAggregator::new()))
666 .collect();
667 let mut finalized_transactions = vec![];
668 let blocks_map = blocks.read();
669 let (pending_commit_index, mut to_visit_blocks) = {
671 let block_state = blocks_map.get(&pending_block_ref).unwrap().read();
672 (block_state.commit_index, block_state.children.clone())
673 };
674 let mut visited = BTreeSet::new();
676 let mut ignored = BTreeSet::new();
678 while let Some(curr_block_ref) = to_visit_blocks.pop_first() {
680 if !visited.insert(curr_block_ref) {
681 continue;
682 }
683 let curr_block_state = blocks_map.get(&curr_block_ref).unwrap_or_else(|| panic!("Block {curr_block_ref} is either incorrectly gc'ed or failed to be recovered after crash.")).read();
684 let votes_gced = Self::gced_transaction_votes_for_pending_block(
691 gc_rounds,
692 pending_block_ref.round,
693 pending_commit_index,
694 curr_block_state.commit_index,
695 );
696 if ignored.insert(curr_block_ref) {
698 ignored.extend(curr_block_state.origin_descendants.iter());
707 if votes_gced {
713 let hostname = &context
714 .committee
715 .authority(pending_block_ref.author)
716 .hostname;
717 context
718 .metrics
719 .node_metrics
720 .finalizer_skipped_voting_blocks
721 .with_label_values(&[hostname.as_str(), "indirect"])
722 .inc();
723 tracing::debug!(
724 "Block {} is potentially outside of GC bound from current block {}. Skipping indirect finalization.",
725 pending_block_ref,
726 curr_block_ref,
727 );
728 continue;
729 }
730 let curr_block_reject_votes = curr_block_state
732 .reject_votes
733 .get(&pending_block_ref)
734 .cloned()
735 .unwrap_or_default();
736 let mut newly_finalized = vec![];
738 for (index, stake) in &mut accept_votes {
739 if curr_block_reject_votes.contains(index) {
741 continue;
742 }
743 if !stake.add(curr_block_ref.author, &context.committee) {
745 continue;
746 }
747 newly_finalized.push(*index);
748 finalized_transactions.push(*index);
749 }
750 for index in newly_finalized {
752 accept_votes.remove(&index);
753 }
754 if accept_votes.is_empty() {
756 break;
757 }
758 }
759 to_visit_blocks.extend(
761 curr_block_state
762 .children
763 .iter()
764 .filter(|b| !visited.contains(*b)),
765 );
766 }
767 finalized_transactions
768 }
769
770 fn gced_transaction_votes_for_pending_block(
788 gc_rounds: &[(CommitIndex, Round)],
789 pending_block_round: Round,
790 pending_commit_index: CommitIndex,
791 current_commit_index: CommitIndex,
792 ) -> bool {
793 assert!(
794 pending_commit_index <= current_commit_index,
795 "Pending {pending_commit_index} should be <= current {current_commit_index}"
796 );
797 if pending_commit_index == current_commit_index {
798 return false;
799 }
800 let (commit_index, gc_round) = *gc_rounds
805 .get((current_commit_index - 1 - pending_commit_index) as usize)
806 .unwrap();
807 assert_eq!(
808 commit_index,
809 current_commit_index - 1,
810 "Commit index mismatch {commit_index} != {current_commit_index}"
811 );
812 pending_block_round <= gc_round
813 }
814
815 fn pop_finalized_commits(&mut self) -> Vec<CommittedSubDag> {
816 let mut finalized_commits = vec![];
817
818 while let Some(commit_state) = self.pending_commits.front() {
819 if !commit_state.pending_blocks.is_empty()
820 || !commit_state.pending_transactions.is_empty()
821 {
822 break;
824 }
825
826 let commit_state = self.pending_commits.pop_front().unwrap();
828 let mut commit = commit_state.commit;
829 for (block_ref, rejected_transactions) in commit_state.rejected_transactions {
830 commit
831 .rejected_transactions_by_block
832 .insert(block_ref, rejected_transactions.into_iter().collect());
833 }
834
835 let mut blocks_map = self.blocks.write();
837 for block in commit.blocks.iter() {
838 blocks_map.remove(&block.reference());
839 }
840
841 let round_delay = if let Some(last_commit_state) = self.pending_commits.back() {
842 last_commit_state.commit.leader.round - commit.leader.round
843 } else {
844 0
845 };
846 self.context
847 .metrics
848 .node_metrics
849 .finalizer_round_delay
850 .observe(round_delay as f64);
851
852 finalized_commits.push(commit);
853 }
854
855 finalized_commits
856 }
857
858 fn try_update_gc_round(&mut self, last_finalized_commit_round: Round) {
859 let gc_round = self
862 .dag_state
863 .read()
864 .calculate_gc_round(last_finalized_commit_round);
865 self.transaction_certifier.run_gc(gc_round);
866 }
867
868 #[cfg(test)]
869 fn is_empty(&self) -> bool {
870 self.pending_commits.is_empty() && self.blocks.read().is_empty()
871 }
872}
873
874struct CommitState {
875 commit: CommittedSubDag,
876 pending_blocks: BTreeMap<BlockRef, usize>,
880 pending_transactions: BTreeMap<BlockRef, BTreeSet<TransactionIndex>>,
885 rejected_transactions: BTreeMap<BlockRef, BTreeSet<TransactionIndex>>,
887}
888
889impl CommitState {
890 fn new(commit: CommittedSubDag) -> Self {
891 let pending_blocks: BTreeMap<_, _> = commit
892 .blocks
893 .iter()
894 .map(|b| (b.reference(), b.transactions().len()))
895 .collect();
896 assert!(!pending_blocks.is_empty());
897 Self {
898 commit,
899 pending_blocks,
900 pending_transactions: BTreeMap::new(),
901 rejected_transactions: BTreeMap::new(),
902 }
903 }
904
905 fn remove_pending_transactions(
906 &mut self,
907 block_ref: &BlockRef,
908 transactions: &[TransactionIndex],
909 ) {
910 let Some(block_pending_txns) = self.pending_transactions.get_mut(block_ref) else {
911 return;
912 };
913 for t in transactions {
914 block_pending_txns.remove(t);
915 }
916 if block_pending_txns.is_empty() {
917 self.pending_transactions.remove(block_ref);
918 }
919 }
920}
921
922struct BlockState {
923 block: VerifiedBlock,
925 children: BTreeSet<BlockRef>,
927 reject_votes: BTreeMap<BlockRef, BTreeSet<TransactionIndex>>,
929 origin_descendants: Vec<BlockRef>,
932 commit_index: CommitIndex,
934}
935
936impl BlockState {
937 fn new(block: VerifiedBlock, commit_index: CommitIndex) -> Self {
938 let reject_votes: BTreeMap<_, _> = block
939 .transaction_votes()
940 .iter()
941 .map(|v| (v.block_ref, v.rejects.clone().into_iter().collect()))
942 .collect();
943 let origin_descendants = Vec::with_capacity(8);
946 Self {
947 block,
948 children: BTreeSet::new(),
949 reject_votes,
950 origin_descendants,
951 commit_index,
952 }
953 }
954}
955
956#[cfg(test)]
957mod tests {
958 use crate::{
959 TestBlock, VerifiedBlock, block::BlockTransactionVotes,
960 commit_test_fixture::CommitTestFixture, test_dag_builder::DagBuilder,
961 };
962
963 use super::*;
964
965 fn create_commit_finalizer_fixture() -> CommitTestFixture {
966 CommitTestFixture::with_options(4, 0, Some(5))
967 }
968
969 fn create_block(
970 round: Round,
971 authority: u32,
972 mut ancestors: Vec<BlockRef>,
973 num_transactions: usize,
974 reject_votes: Vec<BlockTransactionVotes>,
975 ) -> VerifiedBlock {
976 let i = ancestors
978 .iter()
979 .position(|b| b.author.value() == authority as usize)
980 .unwrap_or_else(|| {
981 panic!("Authority {authority} (round {round}) not found in {ancestors:?}")
982 });
983 let b = ancestors.remove(i);
984 ancestors.insert(0, b);
985 let block = TestBlock::new(round, authority)
987 .set_ancestors(ancestors)
988 .set_transactions(vec![crate::Transaction::new(vec![1; 16]); num_transactions])
989 .set_transaction_votes(reject_votes)
990 .build();
991 VerifiedBlock::new_for_test(block)
992 }
993
994 #[tokio::test]
995 async fn test_direct_finalize_no_reject_votes() {
996 let mut fixture = create_commit_finalizer_fixture();
997
998 let mut dag_builder = DagBuilder::new(fixture.context.clone());
1000 dag_builder.layers(1..=4).num_transactions(10).build();
1001 let blocks = dag_builder.all_blocks();
1002 fixture.add_blocks(blocks.clone());
1003
1004 let leader = blocks.iter().find(|b| b.round() == 2).unwrap();
1006 let committed_sub_dags = fixture.linearizer.handle_commit(vec![leader.clone()]);
1007 assert_eq!(committed_sub_dags.len(), 1);
1008 let committed_sub_dag = &committed_sub_dags[0];
1009
1010 let finalized_commits = fixture
1012 .commit_finalizer
1013 .process_commit(committed_sub_dag.clone())
1014 .await;
1015 assert_eq!(finalized_commits.len(), 1);
1016 let finalized_commit = &finalized_commits[0];
1017 assert_eq!(committed_sub_dag, finalized_commit);
1018
1019 assert!(fixture.commit_finalizer.is_empty());
1021 }
1022
1023 #[tokio::test]
1026 async fn test_direct_finalize_with_reject_votes() {
1027 let mut fixture = create_commit_finalizer_fixture();
1028
1029 let mut dag_builder = DagBuilder::new(fixture.context.clone());
1031 dag_builder.layer(1).num_transactions(10).build();
1032
1033 let round_1_blocks = dag_builder.all_blocks();
1034 fixture.add_blocks_with_own_votes(
1035 round_1_blocks
1036 .iter()
1037 .map(|b| {
1038 if b.author().value() != 3 {
1039 (b.clone(), vec![])
1040 } else {
1041 (b.clone(), vec![0, 3])
1042 }
1043 })
1044 .collect(),
1045 );
1046
1047 let block_with_rejected_txn = round_1_blocks[3].clone();
1049 let reject_vote = BlockTransactionVotes {
1050 block_ref: block_with_rejected_txn.reference(),
1051 rejects: vec![0, 3],
1052 };
1053
1054 let ancestors: Vec<BlockRef> = round_1_blocks[0..3].iter().map(|b| b.reference()).collect();
1056 let round_2_blocks = vec![
1058 create_block(
1059 2,
1060 0,
1061 round_1_blocks.iter().map(|b| b.reference()).collect(),
1062 10,
1063 vec![reject_vote.clone()],
1064 ),
1065 create_block(2, 1, ancestors.clone(), 10, vec![]),
1066 create_block(2, 2, ancestors.clone(), 10, vec![]),
1067 ];
1068 fixture.add_blocks(round_2_blocks.clone());
1069
1070 let leader = round_2_blocks[0].clone();
1072 let committed_sub_dags = fixture.linearizer.handle_commit(vec![leader.clone()]);
1073 assert_eq!(committed_sub_dags.len(), 1);
1074 let committed_sub_dag = &committed_sub_dags[0];
1075 assert_eq!(committed_sub_dag.blocks.len(), 5);
1076
1077 let ancestors: Vec<BlockRef> = round_2_blocks.iter().map(|b| b.reference()).collect();
1079 let round_3_blocks = vec![
1080 create_block(3, 0, ancestors.clone(), 0, vec![]),
1081 create_block(3, 1, ancestors.clone(), 0, vec![reject_vote.clone()]),
1082 create_block(3, 2, ancestors.clone(), 0, vec![reject_vote.clone()]),
1083 create_block(
1084 3,
1085 3,
1086 std::iter::once(round_1_blocks[3].reference())
1087 .chain(ancestors.clone())
1088 .collect(),
1089 0,
1090 vec![reject_vote.clone()],
1091 ),
1092 ];
1093 fixture.add_blocks(round_3_blocks.clone());
1094
1095 let ancestors: Vec<BlockRef> = round_3_blocks.iter().map(|b| b.reference()).collect();
1097 let round_4_blocks = vec![
1098 create_block(4, 0, ancestors.clone(), 0, vec![]),
1099 create_block(4, 1, ancestors.clone(), 0, vec![]),
1100 create_block(4, 2, ancestors.clone(), 0, vec![]),
1101 create_block(4, 3, ancestors.clone(), 0, vec![]),
1102 ];
1103 fixture.add_blocks(round_4_blocks.clone());
1104
1105 let finalized_commits = fixture
1108 .commit_finalizer
1109 .process_commit(committed_sub_dag.clone())
1110 .await;
1111 assert_eq!(finalized_commits.len(), 1);
1112 let finalized_commit = &finalized_commits[0];
1113 assert_eq!(committed_sub_dag.commit_ref, finalized_commit.commit_ref);
1114 assert_eq!(committed_sub_dag.blocks, finalized_commit.blocks);
1115 assert_eq!(finalized_commit.rejected_transactions_by_block.len(), 1);
1116 assert_eq!(
1117 finalized_commit
1118 .rejected_transactions_by_block
1119 .get(&block_with_rejected_txn.reference())
1120 .unwrap()
1121 .clone(),
1122 vec![0, 3],
1123 );
1124
1125 assert!(fixture.commit_finalizer.is_empty());
1127 }
1128
1129 #[tokio::test]
1134 async fn test_indirect_finalize_with_reject_votes() {
1135 let mut fixture = create_commit_finalizer_fixture();
1136
1137 let mut dag_builder = DagBuilder::new(fixture.context.clone());
1139 dag_builder.layer(1).num_transactions(10).build();
1140
1141 let round_1_blocks = dag_builder.all_blocks();
1142 fixture.add_blocks_with_own_votes(
1143 round_1_blocks
1144 .iter()
1145 .map(|b| {
1146 if b.author().value() != 3 {
1147 (b.clone(), vec![])
1148 } else {
1149 (b.clone(), vec![0, 3])
1150 }
1151 })
1152 .collect(),
1153 );
1154
1155 let block_with_rejected_txn = round_1_blocks[3].clone();
1157 let ancestors: Vec<BlockRef> = round_1_blocks[0..3].iter().map(|b| b.reference()).collect();
1164 let round_2_blocks = vec![
1166 create_block(
1167 2,
1168 0,
1169 round_1_blocks.iter().map(|b| b.reference()).collect(),
1170 10,
1171 vec![BlockTransactionVotes {
1172 block_ref: block_with_rejected_txn.reference(),
1173 rejects: vec![1, 4],
1174 }],
1175 ),
1176 create_block(2, 1, ancestors.clone(), 10, vec![]),
1178 create_block(2, 2, ancestors.clone(), 10, vec![]),
1179 ];
1180 fixture.add_blocks(round_2_blocks.clone());
1181
1182 let mut leaders = vec![round_2_blocks[0].clone()];
1184
1185 let ancestors: Vec<BlockRef> = round_2_blocks.iter().map(|b| b.reference()).collect();
1187 let round_3_blocks = vec![
1188 create_block(3, 0, ancestors.clone(), 0, vec![]),
1189 create_block(
1190 3,
1191 1,
1192 ancestors.clone(),
1193 0,
1194 vec![BlockTransactionVotes {
1195 block_ref: block_with_rejected_txn.reference(),
1196 rejects: vec![1, 4, 7],
1197 }],
1198 ),
1199 create_block(
1200 3,
1201 3,
1202 std::iter::once(round_1_blocks[3].reference())
1203 .chain(ancestors.clone())
1204 .collect(),
1205 0,
1206 vec![],
1207 ),
1208 ];
1209 fixture.add_blocks(round_3_blocks.clone());
1210 leaders.push(round_3_blocks[2].clone());
1211
1212 let ancestors: Vec<BlockRef> = round_3_blocks.iter().map(|b| b.reference()).collect();
1214 let round_4_blocks = vec![
1215 create_block(4, 0, ancestors.clone(), 0, vec![]),
1216 create_block(4, 1, ancestors.clone(), 0, vec![]),
1217 create_block(
1218 4,
1219 2,
1220 std::iter::once(round_2_blocks[2].reference())
1221 .chain(ancestors.clone())
1222 .collect(),
1223 0,
1224 vec![BlockTransactionVotes {
1225 block_ref: block_with_rejected_txn.reference(),
1226 rejects: vec![1],
1227 }],
1228 ),
1229 create_block(4, 3, ancestors.clone(), 0, vec![]),
1230 ];
1231 fixture.add_blocks(round_4_blocks.clone());
1232 leaders.push(round_4_blocks[1].clone());
1233
1234 let mut last_round_blocks = round_4_blocks.clone();
1238 for r in 5..=7 {
1239 let ancestors: Vec<BlockRef> =
1240 last_round_blocks.iter().map(|b| b.reference()).collect();
1241 let round_blocks: Vec<_> = (0..4)
1242 .map(|i| create_block(r, i, ancestors.clone(), 0, vec![]))
1243 .collect();
1244 fixture.add_blocks(round_blocks.clone());
1245 if r == 5 {
1246 leaders.push(round_blocks[0].clone());
1247 }
1248 last_round_blocks = round_blocks;
1249 }
1250
1251 assert_eq!(leaders.len(), 4);
1253 let committed_sub_dags = fixture.linearizer.handle_commit(leaders);
1254 assert_eq!(committed_sub_dags.len(), 4);
1255
1256 for commit in committed_sub_dags.iter().take(3) {
1258 let finalized_commits = fixture
1259 .commit_finalizer
1260 .process_commit(commit.clone())
1261 .await;
1262 assert_eq!(finalized_commits.len(), 0);
1263 }
1264
1265 let finalized_commits = fixture
1267 .commit_finalizer
1268 .process_commit(committed_sub_dags[3].clone())
1269 .await;
1270 assert_eq!(finalized_commits.len(), 4);
1271
1272 let rejected_transactions = finalized_commits[0].rejected_transactions_by_block.clone();
1274 assert_eq!(rejected_transactions.len(), 1);
1275 assert_eq!(
1276 rejected_transactions
1277 .get(&block_with_rejected_txn.reference())
1278 .unwrap(),
1279 &vec![1, 4]
1280 );
1281
1282 for commit in finalized_commits.iter().skip(1) {
1284 assert!(commit.rejected_transactions_by_block.is_empty());
1285 }
1286
1287 assert!(fixture.commit_finalizer.is_empty());
1289 }
1290
1291 #[tokio::test]
1293 async fn test_direct_finalize_with_gc() {
1294 let mut fixture = create_commit_finalizer_fixture();
1295 assert_eq!(fixture.context.protocol_config.consensus_gc_depth(), 5);
1296
1297 let mut dag_builder = DagBuilder::new(fixture.context.clone());
1299 dag_builder.layer(1).num_transactions(10).build();
1300 let round_1_blocks = dag_builder.all_blocks();
1301 fixture.add_blocks(round_1_blocks.clone());
1302
1303 let block_rejected = round_1_blocks[3].clone();
1305
1306 let mut last_round_blocks: Vec<VerifiedBlock> = round_1_blocks
1309 .iter()
1310 .enumerate()
1311 .filter_map(|(i, b)| {
1312 if i != block_rejected.author().value() {
1313 Some(b.clone())
1314 } else {
1315 None
1316 }
1317 })
1318 .collect();
1319 for r in 2..=5 {
1320 let ancestors: Vec<BlockRef> =
1321 last_round_blocks.iter().map(|b| b.reference()).collect();
1322 last_round_blocks = [0, 1, 2]
1323 .map(|i| create_block(r, i, ancestors.clone(), 0, vec![]))
1324 .to_vec();
1325 fixture.add_blocks(last_round_blocks.clone());
1326 }
1327
1328 let mut leaders = vec![];
1331 for r in 6..=9 {
1332 let ancestors: Vec<BlockRef> =
1333 last_round_blocks.iter().map(|b| b.reference()).collect();
1334 last_round_blocks = [0, 1, 2]
1335 .map(|i| {
1336 let mut ancestors = ancestors.clone();
1337 if i == 0 {
1338 ancestors.push(block_rejected.reference());
1340 }
1341 create_block(r, i, ancestors, 0, vec![])
1342 })
1343 .to_vec();
1344 leaders.push(last_round_blocks[0].clone());
1345 fixture.add_blocks(last_round_blocks.clone());
1346 }
1347
1348 assert_eq!(leaders.len(), 4);
1350 let committed_sub_dags = fixture.linearizer.handle_commit(leaders);
1351 assert_eq!(committed_sub_dags.len(), 4);
1352
1353 assert!(committed_sub_dags[0].blocks.contains(&block_rejected));
1355
1356 for commit in committed_sub_dags.iter().take(3) {
1358 assert!(commit.decided_with_local_blocks);
1359 let finalized_commits = fixture
1360 .commit_finalizer
1361 .process_commit(commit.clone())
1362 .await;
1363 assert_eq!(finalized_commits.len(), 0);
1364 }
1365
1366 let finalized_commits = fixture
1368 .commit_finalizer
1369 .process_commit(committed_sub_dags[3].clone())
1370 .await;
1371 assert_eq!(finalized_commits.len(), 4);
1372
1373 let rejected_transactions = finalized_commits[0].rejected_transactions_by_block.clone();
1377 assert_eq!(rejected_transactions.len(), 1);
1378 assert_eq!(
1379 rejected_transactions
1380 .get(&block_rejected.reference())
1381 .unwrap(),
1382 &vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
1383 );
1384
1385 for commit in finalized_commits.iter().skip(1) {
1387 assert!(commit.rejected_transactions_by_block.is_empty());
1388 }
1389
1390 assert!(fixture.commit_finalizer.is_empty());
1392 }
1393
1394 #[tokio::test]
1396 async fn test_indirect_reject_with_gc() {
1397 let mut fixture = create_commit_finalizer_fixture();
1398 assert_eq!(fixture.context.protocol_config.consensus_gc_depth(), 5);
1399
1400 let mut dag_builder = DagBuilder::new(fixture.context.clone());
1402 dag_builder.layer(1).num_transactions(10).build();
1403
1404 let round_1_blocks = dag_builder.all_blocks();
1405 fixture.add_blocks(round_1_blocks.clone());
1406
1407 let block_with_rejected_txn = round_1_blocks[3].clone();
1409 let ancestors: Vec<BlockRef> = round_1_blocks.iter().map(|b| b.reference()).collect();
1416 let round_2_blocks = vec![
1417 create_block(2, 0, ancestors.clone(), 0, vec![]),
1418 create_block(
1419 2,
1420 1,
1421 ancestors.clone(),
1422 0,
1423 vec![BlockTransactionVotes {
1424 block_ref: block_with_rejected_txn.reference(),
1425 rejects: vec![1],
1426 }],
1427 ),
1428 create_block(2, 2, ancestors.clone(), 0, vec![]),
1429 create_block(2, 3, ancestors.clone(), 0, vec![]),
1430 ];
1431 fixture.add_blocks(round_2_blocks.clone());
1432
1433 let mut last_round_blocks: Vec<VerifiedBlock> = round_2_blocks
1436 .iter()
1437 .enumerate()
1438 .filter_map(|(i, b)| if i != 2 { Some(b.clone()) } else { None })
1439 .collect();
1440 for r in 3..=6 {
1441 let ancestors: Vec<BlockRef> =
1442 last_round_blocks.iter().map(|b| b.reference()).collect();
1443 last_round_blocks = [0, 1, 3]
1444 .map(|i| create_block(r, i, ancestors.clone(), 0, vec![]))
1445 .to_vec();
1446 fixture.add_blocks(last_round_blocks.clone());
1447 }
1448
1449 let mut leaders = vec![];
1451 for r in 7..=10 {
1452 let ancestors: Vec<BlockRef> =
1453 last_round_blocks.iter().map(|b| b.reference()).collect();
1454 last_round_blocks = (0..4)
1455 .map(|i| {
1456 let mut ancestors = ancestors.clone();
1457 if r == 7 && i == 2 {
1458 ancestors.push(round_2_blocks[2].reference());
1460 }
1461 create_block(r, i, ancestors, 0, vec![])
1462 })
1463 .collect();
1464 leaders.push(last_round_blocks[0].clone());
1465 fixture.add_blocks(last_round_blocks.clone());
1466 }
1467
1468 assert_eq!(leaders.len(), 4);
1470 let committed_sub_dags = fixture.linearizer.handle_commit(leaders);
1471 assert_eq!(committed_sub_dags.len(), 4);
1472
1473 assert!(committed_sub_dags[0].blocks.contains(&round_2_blocks[1]));
1475 for commit in committed_sub_dags.iter() {
1477 assert!(!commit.blocks.contains(&round_2_blocks[2]));
1478 }
1479
1480 for commit in committed_sub_dags.iter().take(3) {
1482 assert!(commit.decided_with_local_blocks);
1483 let finalized_commits = fixture
1484 .commit_finalizer
1485 .process_commit(commit.clone())
1486 .await;
1487 assert_eq!(finalized_commits.len(), 0);
1488 }
1489
1490 let finalized_commits = fixture
1492 .commit_finalizer
1493 .process_commit(committed_sub_dags[3].clone())
1494 .await;
1495 assert_eq!(finalized_commits.len(), 4);
1496
1497 let rejected_transactions = finalized_commits[0].rejected_transactions_by_block.clone();
1501 assert_eq!(rejected_transactions.len(), 1);
1502 assert_eq!(
1503 rejected_transactions
1504 .get(&block_with_rejected_txn.reference())
1505 .unwrap(),
1506 &vec![1]
1507 );
1508
1509 for commit in finalized_commits.iter().skip(1) {
1511 assert!(commit.rejected_transactions_by_block.is_empty());
1512 }
1513
1514 assert!(fixture.commit_finalizer.is_empty());
1516 }
1517
1518 #[tokio::test]
1519 async fn test_finalize_remote_commits_with_reject_votes() {
1520 let mut fixture: CommitTestFixture = create_commit_finalizer_fixture();
1521 let mut all_blocks = vec![];
1522
1523 let mut dag_builder = DagBuilder::new(fixture.context.clone());
1525 dag_builder.layer(1).num_transactions(10).build();
1526 let round_1_blocks = dag_builder.all_blocks();
1527 all_blocks.push(round_1_blocks.clone());
1528
1529 let mut leaders = vec![round_1_blocks[0].clone()];
1531
1532 let mut last_round_blocks = round_1_blocks.clone();
1534 for r in 2..=9 {
1535 let ancestors: Vec<BlockRef> =
1536 last_round_blocks.iter().map(|b| b.reference()).collect();
1537 let round_blocks: Vec<_> = (0..4)
1538 .map(|i| create_block(r, i, ancestors.clone(), 0, vec![]))
1539 .collect();
1540 all_blocks.push(round_blocks.clone());
1541 if r <= 7 && r != 5 {
1542 leaders.push(round_blocks[r as usize % 4].clone());
1543 }
1544 last_round_blocks = round_blocks;
1545 }
1546
1547 assert_eq!(leaders.len(), 6);
1549
1550 async fn add_blocks_and_process_commit(
1551 fixture: &mut CommitTestFixture,
1552 leaders: &[VerifiedBlock],
1553 all_blocks: &[Vec<VerifiedBlock>],
1554 index: usize,
1555 local: bool,
1556 ) -> Vec<CommittedSubDag> {
1557 let leader = leaders[index].clone();
1558 if local {
1560 for round_blocks in all_blocks.iter().take(leader.round() as usize + 2) {
1561 fixture.add_blocks(round_blocks.clone());
1562 }
1563 } else {
1564 for round_blocks in all_blocks.iter().take(leader.round() as usize) {
1565 fixture.add_blocks(round_blocks.clone());
1566 }
1567 };
1568 let mut committed_sub_dags = fixture.linearizer.handle_commit(vec![leader]);
1570 assert_eq!(committed_sub_dags.len(), 1);
1571 let mut remote_commit = committed_sub_dags.pop().unwrap();
1572 remote_commit.decided_with_local_blocks = local;
1573 fixture
1575 .commit_finalizer
1576 .process_commit(remote_commit.clone())
1577 .await
1578 }
1579
1580 for i in 0..3 {
1582 let finalized_commits =
1583 add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, i, false).await;
1584 assert!(finalized_commits.is_empty());
1585 }
1586
1587 let finalized_commits =
1589 add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, 3, false).await;
1590 assert_eq!(finalized_commits.len(), 1);
1591 assert_eq!(finalized_commits[0].commit_ref.index, 1);
1592 assert_eq!(finalized_commits[0].leader.round, 1);
1593
1594 let finalized_commits =
1596 add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, 4, true).await;
1597 assert_eq!(finalized_commits.len(), 2);
1598 assert_eq!(finalized_commits[0].commit_ref.index, 2);
1599 assert_eq!(finalized_commits[0].leader.round, 2);
1600 assert_eq!(finalized_commits[1].commit_ref.index, 3);
1601 assert_eq!(finalized_commits[1].leader.round, 3);
1602
1603 let finalized_commits =
1605 add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, 5, true).await;
1606 assert_eq!(finalized_commits.len(), 3);
1607 assert_eq!(finalized_commits[0].commit_ref.index, 4);
1608 assert_eq!(finalized_commits[0].leader.round, 4);
1609 assert_eq!(finalized_commits[1].commit_ref.index, 5);
1610 assert_eq!(finalized_commits[1].leader.round, 6);
1611 assert_eq!(finalized_commits[2].commit_ref.index, 6);
1612 assert_eq!(finalized_commits[2].leader.round, 7);
1613
1614 assert!(fixture.commit_finalizer.is_empty());
1616 }
1617}