1use std::{
5 collections::{BTreeMap, BTreeSet, VecDeque},
6 sync::Arc,
7};
8
9use consensus_config::Stake;
10use consensus_types::block::{BlockRef, Round, TransactionIndex};
11use mysten_metrics::{
12 monitored_mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
13 monitored_scope, spawn_logged_monitored_task,
14};
15use parking_lot::RwLock;
16use tokio::task::JoinSet;
17
18use crate::{
19 BlockAPI, CommitIndex, CommittedSubDag, VerifiedBlock,
20 commit::DEFAULT_WAVE_LENGTH,
21 context::Context,
22 dag_state::DagState,
23 error::{ConsensusError, ConsensusResult},
24 stake_aggregator::{QuorumThreshold, StakeAggregator},
25 transaction_certifier::TransactionCertifier,
26};
27
28pub(crate) const INDIRECT_REJECT_DEPTH: Round = 3;
32
33pub(crate) struct CommitFinalizerHandle {
35 sender: UnboundedSender<CommittedSubDag>,
36}
37
38impl CommitFinalizerHandle {
39 pub(crate) fn send(&self, commit: CommittedSubDag) -> ConsensusResult<()> {
41 self.sender.send(commit).map_err(|e| {
42 tracing::warn!("Failed to send to commit finalizer, probably due to shutdown: {e:?}");
43 ConsensusError::Shutdown
44 })
45 }
46}
47
48pub struct CommitFinalizer {
70 context: Arc<Context>,
71 dag_state: Arc<RwLock<DagState>>,
72 transaction_certifier: TransactionCertifier,
73 commit_sender: UnboundedSender<CommittedSubDag>,
74
75 last_processed_commit: Option<CommitIndex>,
77 pending_commits: VecDeque<CommitState>,
79 blocks: Arc<RwLock<BTreeMap<BlockRef, RwLock<BlockState>>>>,
81}
82
83impl CommitFinalizer {
84 pub fn new(
85 context: Arc<Context>,
86 dag_state: Arc<RwLock<DagState>>,
87 transaction_certifier: TransactionCertifier,
88 commit_sender: UnboundedSender<CommittedSubDag>,
89 ) -> Self {
90 Self {
91 context,
92 dag_state,
93 transaction_certifier,
94 commit_sender,
95 last_processed_commit: None,
96 pending_commits: VecDeque::new(),
97 blocks: Arc::new(RwLock::new(BTreeMap::new())),
98 }
99 }
100
101 pub(crate) fn start(
102 context: Arc<Context>,
103 dag_state: Arc<RwLock<DagState>>,
104 transaction_certifier: TransactionCertifier,
105 commit_sender: UnboundedSender<CommittedSubDag>,
106 ) -> CommitFinalizerHandle {
107 let processor = Self::new(context, dag_state, transaction_certifier, commit_sender);
108 let (sender, receiver) = unbounded_channel("consensus_commit_finalizer");
109 let _handle =
110 spawn_logged_monitored_task!(processor.run(receiver), "consensus_commit_finalizer");
111 CommitFinalizerHandle { sender }
112 }
113
114 async fn run(mut self, mut receiver: UnboundedReceiver<CommittedSubDag>) {
115 while let Some(committed_sub_dag) = receiver.recv().await {
116 let already_finalized = !self.context.protocol_config.mysticeti_fastpath()
117 || committed_sub_dag.recovered_rejected_transactions;
118 let finalized_commits = if !already_finalized {
119 self.process_commit(committed_sub_dag).await
120 } else {
121 vec![committed_sub_dag]
122 };
123 if !finalized_commits.is_empty() {
124 self.try_update_gc_round(finalized_commits.last().unwrap().leader.round);
128 let mut dag_state = self.dag_state.write();
129 if !already_finalized {
130 for commit in &finalized_commits {
132 dag_state.add_finalized_commit(
133 commit.commit_ref,
134 commit.rejected_transactions_by_block.clone(),
135 );
136 }
137 }
138 dag_state.flush();
143 }
144 for commit in finalized_commits {
145 if let Err(e) = self.commit_sender.send(commit) {
146 tracing::warn!(
147 "Failed to send to commit handler, probably due to shutdown: {e:?}"
148 );
149 return;
150 }
151 }
152 }
153 }
154
155 pub async fn process_commit(
156 &mut self,
157 committed_sub_dag: CommittedSubDag,
158 ) -> Vec<CommittedSubDag> {
159 let _scope = monitored_scope("CommitFinalizer::process_commit");
160
161 if let Some(last_processed_commit) = self.last_processed_commit {
162 assert_eq!(
163 last_processed_commit + 1,
164 committed_sub_dag.commit_ref.index
165 );
166 }
167 self.last_processed_commit = Some(committed_sub_dag.commit_ref.index);
168
169 self.pending_commits
170 .push_back(CommitState::new(committed_sub_dag));
171
172 let mut finalized_commits = vec![];
173
174 for i in 0..self.pending_commits.len() {
193 let commit_state = &self.pending_commits[i];
194 if commit_state.pending_blocks.is_empty() {
195 continue;
197 }
198 if !commit_state.commit.decided_with_local_blocks {
203 let last_commit_state = self.pending_commits.back().unwrap();
204 if commit_state.commit.leader.round + DEFAULT_WAVE_LENGTH
205 > last_commit_state.commit.leader.round
206 {
207 break;
208 }
209 }
210 self.try_direct_finalize_commit(i);
211 }
212 let direct_finalized_commits = self.pop_finalized_commits();
213 self.context
214 .metrics
215 .node_metrics
216 .finalizer_output_commits
217 .with_label_values(&["direct"])
218 .inc_by(direct_finalized_commits.len() as u64);
219 finalized_commits.extend(direct_finalized_commits);
220
221 if !self.pending_commits.is_empty() {
224 self.link_blocks_in_last_commit();
231 self.append_origin_descendants_from_last_commit();
232 while self.pending_commits.len() > 1 {
236 if !self.pending_commits[0].pending_blocks.is_empty() {
239 break;
240 }
241 self.try_indirect_finalize_first_commit().await;
243 let indirect_finalized_commits = self.pop_finalized_commits();
244 if indirect_finalized_commits.is_empty() {
245 break;
247 }
248 self.context
249 .metrics
250 .node_metrics
251 .finalizer_output_commits
252 .with_label_values(&["indirect"])
253 .inc_by(indirect_finalized_commits.len() as u64);
254 finalized_commits.extend(indirect_finalized_commits);
255 }
256 }
257
258 self.context
259 .metrics
260 .node_metrics
261 .finalizer_buffered_commits
262 .set(self.pending_commits.len() as i64);
263
264 finalized_commits
265 }
266
267 fn try_direct_finalize_commit(&mut self, index: usize) {
269 let num_commits = self.pending_commits.len();
270 let commit_state = self
271 .pending_commits
272 .get_mut(index)
273 .unwrap_or_else(|| panic!("Commit {} does not exist. len = {}", index, num_commits,));
274 assert!(!commit_state.pending_blocks.is_empty());
277
278 let metrics = &self.context.metrics.node_metrics;
279 let pending_blocks = std::mem::take(&mut commit_state.pending_blocks);
280 for (block_ref, num_transactions) in pending_blocks {
281 let reject_votes = self.transaction_certifier.get_reject_votes(&block_ref)
282 .unwrap_or_else(|| panic!("No vote info found for {block_ref}. It is either incorrectly gc'ed or failed to be recovered after crash."));
283 metrics
284 .finalizer_transaction_status
285 .with_label_values(&["direct_finalize"])
286 .inc_by((num_transactions - reject_votes.len()) as u64);
287 let hostname = &self.context.committee.authority(block_ref.author).hostname;
288 metrics
289 .finalizer_reject_votes
290 .with_label_values(&[hostname])
291 .inc_by(reject_votes.len() as u64);
292 for (transaction_index, stake) in reject_votes {
295 let entry = if stake < self.context.committee.quorum_threshold() {
298 commit_state
299 .pending_transactions
300 .entry(block_ref)
301 .or_default()
302 } else {
303 metrics
304 .finalizer_transaction_status
305 .with_label_values(&["direct_reject"])
306 .inc();
307 commit_state
308 .rejected_transactions
309 .entry(block_ref)
310 .or_default()
311 };
312 entry.insert(transaction_index);
313 }
314 }
315 }
316
317 fn link_blocks_in_last_commit(&mut self) {
320 let commit_state = self
321 .pending_commits
322 .back_mut()
323 .unwrap_or_else(|| panic!("No pending commit."));
324
325 let mut blocks = commit_state.commit.blocks.clone();
328 blocks.sort_by_key(|b| b.round());
329
330 let mut blocks_map = self.blocks.write();
331 for block in blocks {
332 let block_ref = block.reference();
333 for ancestor in block.ancestors() {
335 if let Some(ancestor_block) = blocks_map.get(ancestor) {
338 ancestor_block.write().children.insert(block_ref);
339 }
340 }
341 blocks_map
343 .entry(block_ref)
344 .or_insert_with(|| RwLock::new(BlockState::new(block)));
345 }
346 }
347
348 fn append_origin_descendants_from_last_commit(&mut self) {
369 let commit_state = self
370 .pending_commits
371 .back_mut()
372 .unwrap_or_else(|| panic!("No pending commit."));
373 let mut committed_blocks = commit_state.commit.blocks.clone();
374 committed_blocks.sort_by_key(|b| b.round());
375 let blocks_map = self.blocks.read();
376 for committed_block in committed_blocks {
377 let committed_block_ref = committed_block.reference();
378 let mut origin_ancestor_ref = *blocks_map
382 .get(&committed_block_ref)
383 .unwrap()
384 .read()
385 .block
386 .ancestors()
387 .first()
388 .unwrap();
389 while origin_ancestor_ref.author == committed_block_ref.author {
390 let Some(origin_ancestor_block) = blocks_map.get(&origin_ancestor_ref) else {
391 break;
392 };
393 origin_ancestor_block
394 .write()
395 .origin_descendants
396 .push(committed_block_ref);
397 origin_ancestor_ref = *origin_ancestor_block
398 .read()
399 .block
400 .ancestors()
401 .first()
402 .unwrap();
403 }
404 }
405 }
406
407 async fn try_indirect_finalize_first_commit(&mut self) {
409 assert!(!self.pending_commits.is_empty());
411 assert!(self.pending_commits[0].pending_blocks.is_empty());
412
413 self.check_pending_transactions_in_first_commit();
415
416 self.try_indirect_finalize_pending_transactions_in_first_commit()
418 .await;
419
420 self.try_indirect_reject_pending_transactions_in_first_commit();
422 }
423
424 fn check_pending_transactions_in_first_commit(&mut self) {
425 let mut all_rejected_transactions: Vec<(BlockRef, Vec<TransactionIndex>)> = vec![];
426
427 for (block_ref, pending_transactions) in &self.pending_commits[0].pending_transactions {
429 let reject_votes: BTreeMap<TransactionIndex, Stake> = self
430 .transaction_certifier
431 .get_reject_votes(block_ref)
432 .unwrap_or_else(|| panic!("No vote info found for {block_ref}. It is incorrectly gc'ed or failed to be recovered after crash."))
433 .into_iter()
434 .collect();
435 let mut rejected_transactions = vec![];
436 for &transaction_index in pending_transactions {
437 let reject_stake = reject_votes.get(&transaction_index).copied().unwrap();
439 if reject_stake < self.context.committee.quorum_threshold() {
440 continue;
442 }
443 rejected_transactions.push(transaction_index);
445 }
446 if !rejected_transactions.is_empty() {
447 all_rejected_transactions.push((*block_ref, rejected_transactions));
448 }
449 }
450
451 for (block_ref, rejected_transactions) in all_rejected_transactions {
453 self.context
454 .metrics
455 .node_metrics
456 .finalizer_transaction_status
457 .with_label_values(&["direct_late_reject"])
458 .inc_by(rejected_transactions.len() as u64);
459 let curr_commit_state = &mut self.pending_commits[0];
460 curr_commit_state.remove_pending_transactions(&block_ref, &rejected_transactions);
461 curr_commit_state
462 .rejected_transactions
463 .entry(block_ref)
464 .or_default()
465 .extend(rejected_transactions);
466 }
467 }
468
469 async fn try_indirect_finalize_pending_transactions_in_first_commit(&mut self) {
470 let _scope = monitored_scope(
471 "CommitFinalizer::try_indirect_finalize_pending_transactions_in_first_commit",
472 );
473
474 let pending_blocks: Vec<_> = self.pending_commits[0]
475 .pending_transactions
476 .iter()
477 .map(|(k, v)| (*k, v.clone()))
478 .collect();
479
480 const BLOCKS_PER_INDIRECT_COMMIT_TASK: usize = 8;
482
483 let mut all_finalized_transactions = vec![];
485 let mut join_set = JoinSet::new();
486 for chunk in pending_blocks.chunks(BLOCKS_PER_INDIRECT_COMMIT_TASK) {
489 let context = self.context.clone();
490 let blocks = self.blocks.clone();
491 let chunk: Vec<(BlockRef, BTreeSet<TransactionIndex>)> = chunk.to_vec();
492
493 join_set.spawn(tokio::task::spawn_blocking(move || {
494 let mut chunk_results = Vec::new();
495
496 for (block_ref, pending_transactions) in chunk {
497 let finalized = Self::try_indirect_finalize_pending_transactions_in_block(
498 &context,
499 &blocks,
500 block_ref,
501 pending_transactions,
502 );
503
504 if !finalized.is_empty() {
505 chunk_results.push((block_ref, finalized));
506 }
507 }
508
509 chunk_results
510 }));
511 }
512
513 while let Some(result) = join_set.join_next().await {
515 let e = match result {
516 Ok(blocking_result) => match blocking_result {
517 Ok(chunk_results) => {
518 all_finalized_transactions.extend(chunk_results);
519 continue;
520 }
521 Err(e) => e,
522 },
523 Err(e) => e,
524 };
525 if e.is_panic() {
526 std::panic::resume_unwind(e.into_panic());
527 }
528 tracing::info!("Process likely shutting down: {:?}", e);
529 return;
531 }
532
533 for (block_ref, finalized_transactions) in all_finalized_transactions {
534 self.context
535 .metrics
536 .node_metrics
537 .finalizer_transaction_status
538 .with_label_values(&["indirect_finalize"])
539 .inc_by(finalized_transactions.len() as u64);
540 self.pending_commits[0]
542 .remove_pending_transactions(&block_ref, &finalized_transactions);
543 }
544 }
545
546 fn try_indirect_reject_pending_transactions_in_first_commit(&mut self) {
547 let curr_leader_round = self.pending_commits[0].commit.leader.round;
548 let last_commit_leader_round = self.pending_commits.back().unwrap().commit.leader.round;
549 if curr_leader_round + INDIRECT_REJECT_DEPTH <= last_commit_leader_round {
550 let curr_commit_state = &mut self.pending_commits[0];
551 assert!(curr_commit_state.pending_blocks.is_empty());
555 let pending_transactions = std::mem::take(&mut curr_commit_state.pending_transactions);
559 for (block_ref, pending_transactions) in pending_transactions {
560 self.context
561 .metrics
562 .node_metrics
563 .finalizer_transaction_status
564 .with_label_values(&["indirect_reject"])
565 .inc_by(pending_transactions.len() as u64);
566 curr_commit_state
567 .rejected_transactions
568 .entry(block_ref)
569 .or_default()
570 .extend(pending_transactions);
571 }
572 }
573 }
574
575 fn try_indirect_finalize_pending_transactions_in_block(
579 context: &Arc<Context>,
580 blocks: &Arc<RwLock<BTreeMap<BlockRef, RwLock<BlockState>>>>,
581 pending_block_ref: BlockRef,
582 pending_transactions: BTreeSet<TransactionIndex>,
583 ) -> Vec<TransactionIndex> {
584 if pending_transactions.is_empty() {
585 return vec![];
586 }
587 let mut accept_votes: BTreeMap<TransactionIndex, StakeAggregator<QuorumThreshold>> =
588 pending_transactions
589 .into_iter()
590 .map(|transaction_index| (transaction_index, StakeAggregator::new()))
591 .collect();
592 let mut finalized_transactions = vec![];
593 let blocks_map = blocks.read();
594 let mut to_visit_blocks = blocks_map
596 .get(&pending_block_ref)
597 .unwrap()
598 .read()
599 .children
600 .clone();
601 let mut visited = BTreeSet::new();
603 let mut ignored = BTreeSet::new();
605 while let Some(curr_block_ref) = to_visit_blocks.pop_first() {
607 if !visited.insert(curr_block_ref) {
608 continue;
609 }
610 let curr_block_state = blocks_map.get(&curr_block_ref).unwrap_or_else(|| panic!("Block {curr_block_ref} is either incorrectly gc'ed or failed to be recovered after crash.")).read();
611 let curr_origin_ancestor_ref = curr_block_state.block.ancestors().first().unwrap();
617 let skip_votes = curr_block_ref.author == curr_origin_ancestor_ref.author
618 && pending_block_ref.round < curr_origin_ancestor_ref.round
619 && !blocks_map.contains_key(curr_origin_ancestor_ref);
620 if ignored.insert(curr_block_ref) {
622 ignored.extend(curr_block_state.origin_descendants.iter());
631 if context.protocol_config.consensus_skip_gced_accept_votes() && skip_votes {
637 let hostname = &context.committee.authority(curr_block_ref.author).hostname;
638 context
639 .metrics
640 .node_metrics
641 .finalizer_skipped_voting_blocks
642 .with_label_values(&[hostname])
643 .inc();
644 continue;
645 }
646 let curr_block_reject_votes = curr_block_state
648 .reject_votes
649 .get(&pending_block_ref)
650 .cloned()
651 .unwrap_or_default();
652 let mut newly_finalized = vec![];
654 for (index, stake) in &mut accept_votes {
655 if curr_block_reject_votes.contains(index) {
657 continue;
658 }
659 if !stake.add(curr_block_ref.author, &context.committee) {
661 continue;
662 }
663 newly_finalized.push(*index);
664 finalized_transactions.push(*index);
665 }
666 for index in newly_finalized {
668 accept_votes.remove(&index);
669 }
670 if accept_votes.is_empty() {
672 break;
673 }
674 }
675 to_visit_blocks.extend(
677 curr_block_state
678 .children
679 .iter()
680 .filter(|b| !visited.contains(*b)),
681 );
682 }
683 finalized_transactions
684 }
685
686 fn pop_finalized_commits(&mut self) -> Vec<CommittedSubDag> {
687 let mut finalized_commits = vec![];
688
689 while let Some(commit_state) = self.pending_commits.front() {
690 if !commit_state.pending_blocks.is_empty()
691 || !commit_state.pending_transactions.is_empty()
692 {
693 break;
695 }
696
697 let commit_state = self.pending_commits.pop_front().unwrap();
699 let mut commit = commit_state.commit;
700 for (block_ref, rejected_transactions) in commit_state.rejected_transactions {
701 commit
702 .rejected_transactions_by_block
703 .insert(block_ref, rejected_transactions.into_iter().collect());
704 }
705
706 let mut blocks_map = self.blocks.write();
708 for block in commit.blocks.iter() {
709 blocks_map.remove(&block.reference());
710 }
711
712 let round_delay = if let Some(last_commit_state) = self.pending_commits.back() {
713 last_commit_state.commit.leader.round - commit.leader.round
714 } else {
715 0
716 };
717 self.context
718 .metrics
719 .node_metrics
720 .finalizer_round_delay
721 .observe(round_delay as f64);
722
723 finalized_commits.push(commit);
724 }
725
726 finalized_commits
727 }
728
729 fn try_update_gc_round(&mut self, last_finalized_commit_round: Round) {
730 let gc_round = self
733 .dag_state
734 .read()
735 .calculate_gc_round(last_finalized_commit_round);
736 self.transaction_certifier.run_gc(gc_round);
737 }
738
739 #[cfg(test)]
740 fn is_empty(&self) -> bool {
741 self.pending_commits.is_empty() && self.blocks.read().is_empty()
742 }
743}
744
745struct CommitState {
746 commit: CommittedSubDag,
747 pending_blocks: BTreeMap<BlockRef, usize>,
751 pending_transactions: BTreeMap<BlockRef, BTreeSet<TransactionIndex>>,
756 rejected_transactions: BTreeMap<BlockRef, BTreeSet<TransactionIndex>>,
758}
759
760impl CommitState {
761 fn new(commit: CommittedSubDag) -> Self {
762 let pending_blocks: BTreeMap<_, _> = commit
763 .blocks
764 .iter()
765 .map(|b| (b.reference(), b.transactions().len()))
766 .collect();
767 assert!(!pending_blocks.is_empty());
768 Self {
769 commit,
770 pending_blocks,
771 pending_transactions: BTreeMap::new(),
772 rejected_transactions: BTreeMap::new(),
773 }
774 }
775
776 fn remove_pending_transactions(
777 &mut self,
778 block_ref: &BlockRef,
779 transactions: &[TransactionIndex],
780 ) {
781 let Some(block_pending_txns) = self.pending_transactions.get_mut(block_ref) else {
782 return;
783 };
784 for t in transactions {
785 block_pending_txns.remove(t);
786 }
787 if block_pending_txns.is_empty() {
788 self.pending_transactions.remove(block_ref);
789 }
790 }
791}
792
793struct BlockState {
794 block: VerifiedBlock,
796 children: BTreeSet<BlockRef>,
798 reject_votes: BTreeMap<BlockRef, BTreeSet<TransactionIndex>>,
800 origin_descendants: Vec<BlockRef>,
803}
804
805impl BlockState {
806 fn new(block: VerifiedBlock) -> Self {
807 let reject_votes: BTreeMap<_, _> = block
808 .transaction_votes()
809 .iter()
810 .map(|v| (v.block_ref, v.rejects.clone().into_iter().collect()))
811 .collect();
812 let origin_descendants = Vec::with_capacity(8);
815 Self {
816 block,
817 children: BTreeSet::new(),
818 reject_votes,
819 origin_descendants,
820 }
821 }
822}
823
824#[cfg(test)]
825mod tests {
826 use mysten_metrics::monitored_mpsc;
827 use parking_lot::RwLock;
828
829 use crate::{
830 TestBlock, VerifiedBlock, block::BlockTransactionVotes, block_verifier::NoopBlockVerifier,
831 dag_state::DagState, linearizer::Linearizer, storage::mem_store::MemStore,
832 test_dag_builder::DagBuilder,
833 };
834
835 use super::*;
836
837 struct Fixture {
838 context: Arc<Context>,
839 dag_state: Arc<RwLock<DagState>>,
840 transaction_certifier: TransactionCertifier,
841 linearizer: Linearizer,
842 commit_finalizer: CommitFinalizer,
843 }
844
845 impl Fixture {
846 fn add_blocks(&self, blocks: Vec<VerifiedBlock>) {
847 self.transaction_certifier
848 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
849 self.dag_state.write().accept_blocks(blocks);
850 }
851 }
852
853 fn create_commit_finalizer_fixture() -> Fixture {
854 let (mut context, _keys) = Context::new_for_test(4);
855 context
856 .protocol_config
857 .set_consensus_gc_depth_for_testing(5);
858 context
859 .protocol_config
860 .set_consensus_skip_gced_accept_votes_for_testing(true);
861 let context = Arc::new(context);
862 let dag_state = Arc::new(RwLock::new(DagState::new(
863 context.clone(),
864 Arc::new(MemStore::new()),
865 )));
866 let linearizer = Linearizer::new(context.clone(), dag_state.clone());
867 let (blocks_sender, _blocks_receiver) =
868 monitored_mpsc::unbounded_channel("consensus_block_output");
869 let transaction_certifier = TransactionCertifier::new(
870 context.clone(),
871 Arc::new(NoopBlockVerifier {}),
872 dag_state.clone(),
873 blocks_sender,
874 );
875 let (commit_sender, _commit_receiver) = unbounded_channel("consensus_commit_output");
876 let commit_finalizer = CommitFinalizer::new(
877 context.clone(),
878 dag_state.clone(),
879 transaction_certifier.clone(),
880 commit_sender,
881 );
882 Fixture {
883 context,
884 dag_state,
885 transaction_certifier,
886 linearizer,
887 commit_finalizer,
888 }
889 }
890
891 fn create_block(
892 round: Round,
893 authority: u32,
894 mut ancestors: Vec<BlockRef>,
895 num_transactions: usize,
896 reject_votes: Vec<BlockTransactionVotes>,
897 ) -> VerifiedBlock {
898 let i = ancestors
900 .iter()
901 .position(|b| b.author.value() == authority as usize)
902 .unwrap_or_else(|| {
903 panic!("Authority {authority} (round {round}) not found in {ancestors:?}")
904 });
905 let b = ancestors.remove(i);
906 ancestors.insert(0, b);
907 let block = TestBlock::new(round, authority)
909 .set_ancestors(ancestors)
910 .set_transactions(vec![crate::Transaction::new(vec![1; 16]); num_transactions])
911 .set_transaction_votes(reject_votes)
912 .build();
913 VerifiedBlock::new_for_test(block)
914 }
915
916 #[tokio::test]
917 async fn test_direct_finalize_no_reject_votes() {
918 let mut fixture = create_commit_finalizer_fixture();
919
920 let mut dag_builder = DagBuilder::new(fixture.context.clone());
922 dag_builder
923 .layers(1..=4)
924 .num_transactions(10)
925 .build()
926 .persist_layers(fixture.dag_state.clone());
927 let blocks = dag_builder.all_blocks();
928 fixture
929 .transaction_certifier
930 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
931
932 let leader = blocks.iter().find(|b| b.round() == 2).unwrap();
934 let committed_sub_dags = fixture.linearizer.handle_commit(vec![leader.clone()]);
935 assert_eq!(committed_sub_dags.len(), 1);
936 let committed_sub_dag = &committed_sub_dags[0];
937
938 let finalized_commits = fixture
940 .commit_finalizer
941 .process_commit(committed_sub_dag.clone())
942 .await;
943 assert_eq!(finalized_commits.len(), 1);
944 let finalized_commit = &finalized_commits[0];
945 assert_eq!(committed_sub_dag, finalized_commit);
946
947 assert!(fixture.commit_finalizer.is_empty());
949 }
950
951 #[tokio::test]
954 async fn test_direct_finalize_with_reject_votes() {
955 let mut fixture = create_commit_finalizer_fixture();
956
957 let mut dag_builder = DagBuilder::new(fixture.context.clone());
959 dag_builder
960 .layer(1)
961 .num_transactions(10)
962 .build()
963 .persist_layers(fixture.dag_state.clone());
964 let round_1_blocks = dag_builder.all_blocks();
965 fixture.transaction_certifier.add_voted_blocks(
966 round_1_blocks
967 .iter()
968 .map(|b| {
969 if b.author().value() != 3 {
970 (b.clone(), vec![])
971 } else {
972 (b.clone(), vec![0, 3])
973 }
974 })
975 .collect(),
976 );
977
978 let block_with_rejected_txn = round_1_blocks[3].clone();
980 let reject_vote = BlockTransactionVotes {
981 block_ref: block_with_rejected_txn.reference(),
982 rejects: vec![0, 3],
983 };
984
985 let ancestors: Vec<BlockRef> = round_1_blocks[0..3].iter().map(|b| b.reference()).collect();
987 let round_2_blocks = vec![
989 create_block(
990 2,
991 0,
992 round_1_blocks.iter().map(|b| b.reference()).collect(),
993 10,
994 vec![reject_vote.clone()],
995 ),
996 create_block(2, 1, ancestors.clone(), 10, vec![]),
997 create_block(2, 2, ancestors.clone(), 10, vec![]),
998 ];
999 fixture.add_blocks(round_2_blocks.clone());
1000
1001 let leader = round_2_blocks[0].clone();
1003 let committed_sub_dags = fixture.linearizer.handle_commit(vec![leader.clone()]);
1004 assert_eq!(committed_sub_dags.len(), 1);
1005 let committed_sub_dag = &committed_sub_dags[0];
1006 assert_eq!(committed_sub_dag.blocks.len(), 5);
1007
1008 let ancestors: Vec<BlockRef> = round_2_blocks.iter().map(|b| b.reference()).collect();
1010 let round_3_blocks = vec![
1011 create_block(3, 0, ancestors.clone(), 0, vec![]),
1012 create_block(3, 1, ancestors.clone(), 0, vec![reject_vote.clone()]),
1013 create_block(3, 2, ancestors.clone(), 0, vec![reject_vote.clone()]),
1014 create_block(
1015 3,
1016 3,
1017 std::iter::once(round_1_blocks[3].reference())
1018 .chain(ancestors.clone())
1019 .collect(),
1020 0,
1021 vec![reject_vote.clone()],
1022 ),
1023 ];
1024 fixture.add_blocks(round_3_blocks.clone());
1025
1026 let ancestors: Vec<BlockRef> = round_3_blocks.iter().map(|b| b.reference()).collect();
1028 let round_4_blocks = vec![
1029 create_block(4, 0, ancestors.clone(), 0, vec![]),
1030 create_block(4, 1, ancestors.clone(), 0, vec![]),
1031 create_block(4, 2, ancestors.clone(), 0, vec![]),
1032 create_block(4, 3, ancestors.clone(), 0, vec![]),
1033 ];
1034 fixture.add_blocks(round_4_blocks.clone());
1035
1036 let finalized_commits = fixture
1039 .commit_finalizer
1040 .process_commit(committed_sub_dag.clone())
1041 .await;
1042 assert_eq!(finalized_commits.len(), 1);
1043 let finalized_commit = &finalized_commits[0];
1044 assert_eq!(committed_sub_dag.commit_ref, finalized_commit.commit_ref);
1045 assert_eq!(committed_sub_dag.blocks, finalized_commit.blocks);
1046 assert_eq!(finalized_commit.rejected_transactions_by_block.len(), 1);
1047 assert_eq!(
1048 finalized_commit
1049 .rejected_transactions_by_block
1050 .get(&block_with_rejected_txn.reference())
1051 .unwrap()
1052 .clone(),
1053 vec![0, 3],
1054 );
1055
1056 assert!(fixture.commit_finalizer.is_empty());
1058 }
1059
1060 #[tokio::test]
1065 async fn test_indirect_finalize_with_reject_votes() {
1066 let mut fixture = create_commit_finalizer_fixture();
1067
1068 let mut dag_builder = DagBuilder::new(fixture.context.clone());
1070 dag_builder
1071 .layer(1)
1072 .num_transactions(10)
1073 .build()
1074 .persist_layers(fixture.dag_state.clone());
1075 let round_1_blocks = dag_builder.all_blocks();
1076 fixture.transaction_certifier.add_voted_blocks(
1077 round_1_blocks
1078 .iter()
1079 .map(|b| {
1080 if b.author().value() != 3 {
1081 (b.clone(), vec![])
1082 } else {
1083 (b.clone(), vec![0, 3])
1084 }
1085 })
1086 .collect(),
1087 );
1088
1089 let block_with_rejected_txn = round_1_blocks[3].clone();
1091 let ancestors: Vec<BlockRef> = round_1_blocks[0..3].iter().map(|b| b.reference()).collect();
1098 let round_2_blocks = vec![
1100 create_block(
1101 2,
1102 0,
1103 round_1_blocks.iter().map(|b| b.reference()).collect(),
1104 10,
1105 vec![BlockTransactionVotes {
1106 block_ref: block_with_rejected_txn.reference(),
1107 rejects: vec![1, 4],
1108 }],
1109 ),
1110 create_block(2, 1, ancestors.clone(), 10, vec![]),
1112 create_block(2, 2, ancestors.clone(), 10, vec![]),
1113 ];
1114 fixture.add_blocks(round_2_blocks.clone());
1115
1116 let mut leaders = vec![round_2_blocks[0].clone()];
1118
1119 let ancestors: Vec<BlockRef> = round_2_blocks.iter().map(|b| b.reference()).collect();
1121 let round_3_blocks = vec![
1122 create_block(3, 0, ancestors.clone(), 0, vec![]),
1123 create_block(
1124 3,
1125 1,
1126 ancestors.clone(),
1127 0,
1128 vec![BlockTransactionVotes {
1129 block_ref: block_with_rejected_txn.reference(),
1130 rejects: vec![1, 4, 7],
1131 }],
1132 ),
1133 create_block(
1134 3,
1135 3,
1136 std::iter::once(round_1_blocks[3].reference())
1137 .chain(ancestors.clone())
1138 .collect(),
1139 0,
1140 vec![],
1141 ),
1142 ];
1143 fixture.add_blocks(round_3_blocks.clone());
1144 leaders.push(round_3_blocks[2].clone());
1145
1146 let ancestors: Vec<BlockRef> = round_3_blocks.iter().map(|b| b.reference()).collect();
1148 let round_4_blocks = vec![
1149 create_block(4, 0, ancestors.clone(), 0, vec![]),
1150 create_block(4, 1, ancestors.clone(), 0, vec![]),
1151 create_block(
1152 4,
1153 2,
1154 std::iter::once(round_2_blocks[2].reference())
1155 .chain(ancestors.clone())
1156 .collect(),
1157 0,
1158 vec![BlockTransactionVotes {
1159 block_ref: block_with_rejected_txn.reference(),
1160 rejects: vec![1],
1161 }],
1162 ),
1163 create_block(4, 3, ancestors.clone(), 0, vec![]),
1164 ];
1165 fixture.add_blocks(round_4_blocks.clone());
1166 leaders.push(round_4_blocks[1].clone());
1167
1168 let mut last_round_blocks = round_4_blocks.clone();
1172 for r in 5..=7 {
1173 let ancestors: Vec<BlockRef> =
1174 last_round_blocks.iter().map(|b| b.reference()).collect();
1175 let round_blocks: Vec<_> = (0..4)
1176 .map(|i| create_block(r, i, ancestors.clone(), 0, vec![]))
1177 .collect();
1178 fixture.add_blocks(round_blocks.clone());
1179 if r == 5 {
1180 leaders.push(round_blocks[0].clone());
1181 }
1182 last_round_blocks = round_blocks;
1183 }
1184
1185 assert_eq!(leaders.len(), 4);
1187 let committed_sub_dags = fixture.linearizer.handle_commit(leaders);
1188 assert_eq!(committed_sub_dags.len(), 4);
1189
1190 for commit in committed_sub_dags.iter().take(3) {
1192 let finalized_commits = fixture
1193 .commit_finalizer
1194 .process_commit(commit.clone())
1195 .await;
1196 assert_eq!(finalized_commits.len(), 0);
1197 }
1198
1199 let finalized_commits = fixture
1201 .commit_finalizer
1202 .process_commit(committed_sub_dags[3].clone())
1203 .await;
1204 assert_eq!(finalized_commits.len(), 4);
1205
1206 let rejected_transactions = finalized_commits[0].rejected_transactions_by_block.clone();
1208 assert_eq!(rejected_transactions.len(), 1);
1209 assert_eq!(
1210 rejected_transactions
1211 .get(&block_with_rejected_txn.reference())
1212 .unwrap(),
1213 &vec![1, 4]
1214 );
1215
1216 for commit in finalized_commits.iter().skip(1) {
1218 assert!(commit.rejected_transactions_by_block.is_empty());
1219 }
1220
1221 assert!(fixture.commit_finalizer.is_empty());
1223 }
1224
1225 #[tokio::test]
1227 async fn test_indirect_reject_with_gc() {
1228 let mut fixture = create_commit_finalizer_fixture();
1229 assert_eq!(fixture.context.protocol_config.consensus_gc_depth(), 5);
1230
1231 let mut dag_builder = DagBuilder::new(fixture.context.clone());
1233 dag_builder
1234 .layer(1)
1235 .num_transactions(10)
1236 .build()
1237 .persist_layers(fixture.dag_state.clone());
1238 let round_1_blocks = dag_builder.all_blocks();
1239 fixture
1240 .transaction_certifier
1241 .add_voted_blocks(round_1_blocks.iter().map(|b| (b.clone(), vec![])).collect());
1242
1243 let block_with_rejected_txn = round_1_blocks[3].clone();
1245 let ancestors: Vec<BlockRef> = round_1_blocks.iter().map(|b| b.reference()).collect();
1252 let round_2_blocks = vec![
1253 create_block(2, 0, ancestors.clone(), 0, vec![]),
1254 create_block(
1255 2,
1256 1,
1257 ancestors.clone(),
1258 0,
1259 vec![BlockTransactionVotes {
1260 block_ref: block_with_rejected_txn.reference(),
1261 rejects: vec![1],
1262 }],
1263 ),
1264 create_block(2, 2, ancestors.clone(), 0, vec![]),
1265 create_block(2, 3, ancestors.clone(), 0, vec![]),
1266 ];
1267 fixture.add_blocks(round_2_blocks.clone());
1268
1269 let mut last_round_blocks: Vec<VerifiedBlock> = round_2_blocks
1272 .iter()
1273 .enumerate()
1274 .filter_map(|(i, b)| if i != 2 { Some(b.clone()) } else { None })
1275 .collect();
1276 for r in 3..=6 {
1277 let ancestors: Vec<BlockRef> =
1278 last_round_blocks.iter().map(|b| b.reference()).collect();
1279 last_round_blocks = [0, 1, 3]
1280 .map(|i| create_block(r, i, ancestors.clone(), 0, vec![]))
1281 .to_vec();
1282 fixture.add_blocks(last_round_blocks.clone());
1283 }
1284
1285 let mut leaders = vec![];
1287 for r in 7..=10 {
1288 let mut ancestors: Vec<BlockRef> =
1289 last_round_blocks.iter().map(|b| b.reference()).collect();
1290 last_round_blocks = (0..4)
1291 .map(|i| {
1292 if r == 7 && i == 2 {
1293 ancestors.push(round_2_blocks[2].reference());
1295 }
1296 create_block(r, i, ancestors.clone(), 0, vec![])
1297 })
1298 .collect();
1299 leaders.push(last_round_blocks[0].clone());
1300 fixture.add_blocks(last_round_blocks.clone());
1301 }
1302
1303 assert_eq!(leaders.len(), 4);
1305 let committed_sub_dags = fixture.linearizer.handle_commit(leaders);
1306 assert_eq!(committed_sub_dags.len(), 4);
1307
1308 assert!(committed_sub_dags[0].blocks.contains(&round_2_blocks[1]));
1310 for commit in committed_sub_dags.iter() {
1312 assert!(!commit.blocks.contains(&round_2_blocks[2]));
1313 }
1314
1315 for commit in committed_sub_dags.iter().take(3) {
1317 assert!(commit.decided_with_local_blocks);
1318 let finalized_commits = fixture
1319 .commit_finalizer
1320 .process_commit(commit.clone())
1321 .await;
1322 assert_eq!(finalized_commits.len(), 0);
1323 }
1324
1325 let finalized_commits = fixture
1327 .commit_finalizer
1328 .process_commit(committed_sub_dags[3].clone())
1329 .await;
1330 assert_eq!(finalized_commits.len(), 4);
1331
1332 let rejected_transactions = finalized_commits[0].rejected_transactions_by_block.clone();
1336 assert_eq!(rejected_transactions.len(), 1);
1337 assert_eq!(
1338 rejected_transactions
1339 .get(&block_with_rejected_txn.reference())
1340 .unwrap(),
1341 &vec![1]
1342 );
1343
1344 for commit in finalized_commits.iter().skip(1) {
1346 assert!(commit.rejected_transactions_by_block.is_empty());
1347 }
1348
1349 assert!(fixture.commit_finalizer.is_empty());
1351 }
1352
1353 #[tokio::test]
1354 async fn test_finalize_remote_commits_with_reject_votes() {
1355 let mut fixture: Fixture = create_commit_finalizer_fixture();
1356 let mut all_blocks = vec![];
1357
1358 let mut dag_builder = DagBuilder::new(fixture.context.clone());
1360 dag_builder.layer(1).num_transactions(10).build();
1361 let round_1_blocks = dag_builder.all_blocks();
1362 all_blocks.push(round_1_blocks.clone());
1363
1364 let mut leaders = vec![round_1_blocks[0].clone()];
1366
1367 let mut last_round_blocks = round_1_blocks.clone();
1369 for r in 2..=9 {
1370 let ancestors: Vec<BlockRef> =
1371 last_round_blocks.iter().map(|b| b.reference()).collect();
1372 let round_blocks: Vec<_> = (0..4)
1373 .map(|i| create_block(r, i, ancestors.clone(), 0, vec![]))
1374 .collect();
1375 all_blocks.push(round_blocks.clone());
1376 if r <= 7 && r != 5 {
1377 leaders.push(round_blocks[r as usize % 4].clone());
1378 }
1379 last_round_blocks = round_blocks;
1380 }
1381
1382 assert_eq!(leaders.len(), 6);
1384
1385 async fn add_blocks_and_process_commit(
1386 fixture: &mut Fixture,
1387 leaders: &[VerifiedBlock],
1388 all_blocks: &[Vec<VerifiedBlock>],
1389 index: usize,
1390 local: bool,
1391 ) -> Vec<CommittedSubDag> {
1392 let leader = leaders[index].clone();
1393 if local {
1395 for round_blocks in all_blocks.iter().take(leader.round() as usize + 2) {
1396 fixture.add_blocks(round_blocks.clone());
1397 }
1398 } else {
1399 for round_blocks in all_blocks.iter().take(leader.round() as usize) {
1400 fixture.add_blocks(round_blocks.clone());
1401 }
1402 };
1403 let mut committed_sub_dags = fixture.linearizer.handle_commit(vec![leader]);
1405 assert_eq!(committed_sub_dags.len(), 1);
1406 let mut remote_commit = committed_sub_dags.pop().unwrap();
1407 remote_commit.decided_with_local_blocks = local;
1408 fixture
1410 .commit_finalizer
1411 .process_commit(remote_commit.clone())
1412 .await
1413 }
1414
1415 for i in 0..3 {
1417 let finalized_commits =
1418 add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, i, false).await;
1419 assert!(finalized_commits.is_empty());
1420 }
1421
1422 let finalized_commits =
1424 add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, 3, false).await;
1425 assert_eq!(finalized_commits.len(), 1);
1426 assert_eq!(finalized_commits[0].commit_ref.index, 1);
1427 assert_eq!(finalized_commits[0].leader.round, 1);
1428
1429 let finalized_commits =
1431 add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, 4, true).await;
1432 assert_eq!(finalized_commits.len(), 2);
1433 assert_eq!(finalized_commits[0].commit_ref.index, 2);
1434 assert_eq!(finalized_commits[0].leader.round, 2);
1435 assert_eq!(finalized_commits[1].commit_ref.index, 3);
1436 assert_eq!(finalized_commits[1].leader.round, 3);
1437
1438 let finalized_commits =
1440 add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, 5, true).await;
1441 assert_eq!(finalized_commits.len(), 3);
1442 assert_eq!(finalized_commits[0].commit_ref.index, 4);
1443 assert_eq!(finalized_commits[0].leader.round, 4);
1444 assert_eq!(finalized_commits[1].commit_ref.index, 5);
1445 assert_eq!(finalized_commits[1].leader.round, 6);
1446 assert_eq!(finalized_commits[2].commit_ref.index, 6);
1447 assert_eq!(finalized_commits[2].leader.round, 7);
1448
1449 assert!(fixture.commit_finalizer.is_empty());
1451 }
1452}