1use std::{
5 collections::{BTreeMap, BTreeSet, VecDeque},
6 sync::Arc,
7};
8
9use consensus_config::Stake;
10use consensus_types::block::{BlockRef, Round, TransactionIndex};
11use mysten_metrics::{
12 monitored_mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
13 monitored_scope, spawn_logged_monitored_task,
14};
15use parking_lot::RwLock;
16use tokio::task::JoinSet;
17
18use crate::{
19 BlockAPI, CommitIndex, CommittedSubDag, VerifiedBlock,
20 commit::DEFAULT_WAVE_LENGTH,
21 context::Context,
22 dag_state::DagState,
23 error::{ConsensusError, ConsensusResult},
24 stake_aggregator::{QuorumThreshold, StakeAggregator},
25 transaction_certifier::TransactionCertifier,
26};
27
28pub(crate) const INDIRECT_REJECT_DEPTH: Round = 3;
32
33pub(crate) struct CommitFinalizerHandle {
35 sender: UnboundedSender<CommittedSubDag>,
36}
37
38impl CommitFinalizerHandle {
39 pub(crate) fn send(&self, commit: CommittedSubDag) -> ConsensusResult<()> {
41 self.sender.send(commit).map_err(|e| {
42 tracing::warn!("Failed to send to commit finalizer, probably due to shutdown: {e:?}");
43 ConsensusError::Shutdown
44 })
45 }
46}
47
48pub struct CommitFinalizer {
70 context: Arc<Context>,
71 dag_state: Arc<RwLock<DagState>>,
72 transaction_certifier: TransactionCertifier,
73 commit_sender: UnboundedSender<CommittedSubDag>,
74
75 last_processed_commit: Option<CommitIndex>,
77 pending_commits: VecDeque<CommitState>,
79 blocks: Arc<RwLock<BTreeMap<BlockRef, RwLock<BlockState>>>>,
81}
82
83impl CommitFinalizer {
84 pub fn new(
85 context: Arc<Context>,
86 dag_state: Arc<RwLock<DagState>>,
87 transaction_certifier: TransactionCertifier,
88 commit_sender: UnboundedSender<CommittedSubDag>,
89 ) -> Self {
90 Self {
91 context,
92 dag_state,
93 transaction_certifier,
94 commit_sender,
95 last_processed_commit: None,
96 pending_commits: VecDeque::new(),
97 blocks: Arc::new(RwLock::new(BTreeMap::new())),
98 }
99 }
100
101 pub(crate) fn start(
102 context: Arc<Context>,
103 dag_state: Arc<RwLock<DagState>>,
104 transaction_certifier: TransactionCertifier,
105 commit_sender: UnboundedSender<CommittedSubDag>,
106 ) -> CommitFinalizerHandle {
107 let processor = Self::new(context, dag_state, transaction_certifier, commit_sender);
108 let (sender, receiver) = unbounded_channel("consensus_commit_finalizer");
109 let _handle =
110 spawn_logged_monitored_task!(processor.run(receiver), "consensus_commit_finalizer");
111 CommitFinalizerHandle { sender }
112 }
113
114 async fn run(mut self, mut receiver: UnboundedReceiver<CommittedSubDag>) {
115 while let Some(committed_sub_dag) = receiver.recv().await {
116 let already_finalized = !self.context.protocol_config.mysticeti_fastpath()
117 || committed_sub_dag.recovered_rejected_transactions;
118 let finalized_commits = if !already_finalized {
119 self.process_commit(committed_sub_dag).await
120 } else {
121 vec![committed_sub_dag]
122 };
123 if !finalized_commits.is_empty() {
124 self.try_update_gc_round(finalized_commits.last().unwrap().leader.round);
128 let mut dag_state = self.dag_state.write();
129 if !already_finalized {
130 for commit in &finalized_commits {
132 dag_state.add_finalized_commit(
133 commit.commit_ref,
134 commit.rejected_transactions_by_block.clone(),
135 );
136 }
137 }
138 dag_state.flush();
143 }
144 for commit in finalized_commits {
145 if let Err(e) = self.commit_sender.send(commit) {
146 tracing::warn!(
147 "Failed to send to commit handler, probably due to shutdown: {e:?}"
148 );
149 return;
150 }
151 }
152 }
153 }
154
155 pub async fn process_commit(
156 &mut self,
157 committed_sub_dag: CommittedSubDag,
158 ) -> Vec<CommittedSubDag> {
159 let _scope = monitored_scope("CommitFinalizer::process_commit");
160
161 if let Some(last_processed_commit) = self.last_processed_commit {
162 assert_eq!(
163 last_processed_commit + 1,
164 committed_sub_dag.commit_ref.index
165 );
166 }
167 self.last_processed_commit = Some(committed_sub_dag.commit_ref.index);
168
169 self.pending_commits
170 .push_back(CommitState::new(committed_sub_dag));
171
172 let mut finalized_commits = vec![];
173
174 for i in 0..self.pending_commits.len() {
193 let commit_state = &self.pending_commits[i];
194 if commit_state.pending_blocks.is_empty() {
195 continue;
197 }
198 if !commit_state.commit.decided_with_local_blocks {
203 let last_commit_state = self.pending_commits.back().unwrap();
204 if commit_state.commit.leader.round + DEFAULT_WAVE_LENGTH
205 > last_commit_state.commit.leader.round
206 {
207 break;
208 }
209 }
210 self.try_direct_finalize_commit(i);
211 }
212 let direct_finalized_commits = self.pop_finalized_commits();
213 self.context
214 .metrics
215 .node_metrics
216 .finalizer_output_commits
217 .with_label_values(&["direct"])
218 .inc_by(direct_finalized_commits.len() as u64);
219 finalized_commits.extend(direct_finalized_commits);
220
221 if !self.pending_commits.is_empty() {
224 self.link_blocks_in_last_commit();
231 self.append_origin_descendants_from_last_commit();
232 while self.pending_commits.len() > 1 {
236 if !self.pending_commits[0].pending_blocks.is_empty() {
239 break;
240 }
241 self.try_indirect_finalize_first_commit().await;
243 let indirect_finalized_commits = self.pop_finalized_commits();
244 if indirect_finalized_commits.is_empty() {
245 break;
247 }
248 self.context
249 .metrics
250 .node_metrics
251 .finalizer_output_commits
252 .with_label_values(&["indirect"])
253 .inc_by(indirect_finalized_commits.len() as u64);
254 finalized_commits.extend(indirect_finalized_commits);
255 }
256 }
257
258 self.context
259 .metrics
260 .node_metrics
261 .finalizer_buffered_commits
262 .set(self.pending_commits.len() as i64);
263
264 finalized_commits
265 }
266
267 fn try_direct_finalize_commit(&mut self, index: usize) {
269 let num_commits = self.pending_commits.len();
270 let commit_state = self
271 .pending_commits
272 .get_mut(index)
273 .unwrap_or_else(|| panic!("Commit {} does not exist. len = {}", index, num_commits,));
274 assert!(!commit_state.pending_blocks.is_empty());
277
278 let metrics = &self.context.metrics.node_metrics;
279 let pending_blocks = std::mem::take(&mut commit_state.pending_blocks);
280 for (block_ref, num_transactions) in pending_blocks {
281 let reject_votes = self.transaction_certifier.get_reject_votes(&block_ref)
282 .unwrap_or_else(|| panic!("No vote info found for {block_ref}. It is either incorrectly gc'ed or failed to be recovered after crash."));
283 metrics
284 .finalizer_transaction_status
285 .with_label_values(&["direct_finalize"])
286 .inc_by((num_transactions - reject_votes.len()) as u64);
287 let hostname = &self.context.committee.authority(block_ref.author).hostname;
288 metrics
289 .finalizer_reject_votes
290 .with_label_values(&[hostname])
291 .inc_by(reject_votes.len() as u64);
292 for (transaction_index, stake) in reject_votes {
295 let entry = if stake < self.context.committee.quorum_threshold() {
298 commit_state
299 .pending_transactions
300 .entry(block_ref)
301 .or_default()
302 } else {
303 metrics
304 .finalizer_transaction_status
305 .with_label_values(&["direct_reject"])
306 .inc();
307 commit_state
308 .rejected_transactions
309 .entry(block_ref)
310 .or_default()
311 };
312 entry.insert(transaction_index);
313 }
314 }
315 }
316
317 fn link_blocks_in_last_commit(&mut self) {
320 let commit_state = self
321 .pending_commits
322 .back_mut()
323 .unwrap_or_else(|| panic!("No pending commit."));
324
325 let mut blocks = commit_state.commit.blocks.clone();
328 blocks.sort_by_key(|b| b.round());
329
330 let mut blocks_map = self.blocks.write();
331 for block in blocks {
332 let block_ref = block.reference();
333 for ancestor in block.ancestors() {
335 if let Some(ancestor_block) = blocks_map.get(ancestor) {
338 ancestor_block.write().children.insert(block_ref);
339 }
340 }
341 blocks_map
343 .entry(block_ref)
344 .or_insert_with(|| RwLock::new(BlockState::new(block)));
345 }
346 }
347
348 fn append_origin_descendants_from_last_commit(&mut self) {
366 let commit_state = self
367 .pending_commits
368 .back_mut()
369 .unwrap_or_else(|| panic!("No pending commit."));
370 let mut committed_blocks = commit_state.commit.blocks.clone();
371 committed_blocks.sort_by_key(|b| b.round());
372 let blocks_map = self.blocks.read();
373 for committed_block in committed_blocks {
374 let committed_block_ref = committed_block.reference();
375 let mut origin_ancestor_ref = *blocks_map
379 .get(&committed_block_ref)
380 .unwrap()
381 .read()
382 .block
383 .ancestors()
384 .first()
385 .unwrap();
386 while origin_ancestor_ref.author == committed_block_ref.author {
387 let Some(origin_ancestor_block) = blocks_map.get(&origin_ancestor_ref) else {
388 break;
389 };
390 origin_ancestor_block
391 .write()
392 .origin_descendants
393 .push(committed_block_ref);
394 origin_ancestor_ref = *origin_ancestor_block
395 .read()
396 .block
397 .ancestors()
398 .first()
399 .unwrap();
400 }
401 }
402 }
403
404 async fn try_indirect_finalize_first_commit(&mut self) {
406 assert!(!self.pending_commits.is_empty());
408 assert!(self.pending_commits[0].pending_blocks.is_empty());
409
410 self.check_pending_transactions_in_first_commit();
412
413 self.try_indirect_finalize_pending_transactions_in_first_commit()
415 .await;
416
417 self.try_indirect_reject_pending_transactions_in_first_commit();
419 }
420
421 fn check_pending_transactions_in_first_commit(&mut self) {
422 let mut all_rejected_transactions: Vec<(BlockRef, Vec<TransactionIndex>)> = vec![];
423
424 for (block_ref, pending_transactions) in &self.pending_commits[0].pending_transactions {
426 let reject_votes: BTreeMap<TransactionIndex, Stake> = self
427 .transaction_certifier
428 .get_reject_votes(block_ref)
429 .unwrap_or_else(|| panic!("No vote info found for {block_ref}. It is incorrectly gc'ed or failed to be recovered after crash."))
430 .into_iter()
431 .collect();
432 let mut rejected_transactions = vec![];
433 for &transaction_index in pending_transactions {
434 let reject_stake = reject_votes.get(&transaction_index).copied().unwrap();
436 if reject_stake < self.context.committee.quorum_threshold() {
437 continue;
439 }
440 rejected_transactions.push(transaction_index);
442 }
443 if !rejected_transactions.is_empty() {
444 all_rejected_transactions.push((*block_ref, rejected_transactions));
445 }
446 }
447
448 for (block_ref, rejected_transactions) in all_rejected_transactions {
450 self.context
451 .metrics
452 .node_metrics
453 .finalizer_transaction_status
454 .with_label_values(&["direct_late_reject"])
455 .inc_by(rejected_transactions.len() as u64);
456 let curr_commit_state = &mut self.pending_commits[0];
457 curr_commit_state.remove_pending_transactions(&block_ref, &rejected_transactions);
458 curr_commit_state
459 .rejected_transactions
460 .entry(block_ref)
461 .or_default()
462 .extend(rejected_transactions);
463 }
464 }
465
466 async fn try_indirect_finalize_pending_transactions_in_first_commit(&mut self) {
467 let _scope = monitored_scope(
468 "CommitFinalizer::try_indirect_finalize_pending_transactions_in_first_commit",
469 );
470
471 let pending_blocks: Vec<(BlockRef, BTreeSet<TransactionIndex>)> = self.pending_commits[0]
472 .pending_transactions
473 .iter()
474 .map(|(k, v)| (*k, v.clone()))
475 .collect();
476
477 const BLOCKS_PER_INDIRECT_COMMIT_TASK: usize = 8;
479
480 let mut all_finalized_transactions = vec![];
482 let mut join_set = JoinSet::new();
483 for chunk in pending_blocks.chunks(BLOCKS_PER_INDIRECT_COMMIT_TASK) {
486 let context = self.context.clone();
487 let blocks = self.blocks.clone();
488 let chunk: Vec<(BlockRef, BTreeSet<TransactionIndex>)> = chunk.to_vec();
489
490 join_set.spawn(tokio::task::spawn_blocking(move || {
491 let mut chunk_results = Vec::new();
492
493 for (block_ref, pending_transactions) in chunk {
494 let finalized = Self::try_indirect_finalize_pending_transactions_in_block(
495 &context,
496 &blocks,
497 block_ref,
498 pending_transactions,
499 );
500
501 if !finalized.is_empty() {
502 chunk_results.push((block_ref, finalized));
503 }
504 }
505
506 chunk_results
507 }));
508 }
509
510 while let Some(result) = join_set.join_next().await {
512 let e = match result {
513 Ok(blocking_result) => match blocking_result {
514 Ok(chunk_results) => {
515 all_finalized_transactions.extend(chunk_results);
516 continue;
517 }
518 Err(e) => e,
519 },
520 Err(e) => e,
521 };
522 if e.is_panic() {
523 std::panic::resume_unwind(e.into_panic());
524 }
525 tracing::info!("Process likely shutting down: {:?}", e);
526 return;
528 }
529
530 for (block_ref, finalized_transactions) in all_finalized_transactions {
531 self.context
532 .metrics
533 .node_metrics
534 .finalizer_transaction_status
535 .with_label_values(&["indirect_finalize"])
536 .inc_by(finalized_transactions.len() as u64);
537 self.pending_commits[0]
539 .remove_pending_transactions(&block_ref, &finalized_transactions);
540 }
541 }
542
543 fn try_indirect_reject_pending_transactions_in_first_commit(&mut self) {
544 let curr_leader_round = self.pending_commits[0].commit.leader.round;
545 let last_commit_leader_round = self.pending_commits.back().unwrap().commit.leader.round;
546 if curr_leader_round + INDIRECT_REJECT_DEPTH <= last_commit_leader_round {
547 let curr_commit_state = &mut self.pending_commits[0];
548 assert!(curr_commit_state.pending_blocks.is_empty());
552 let pending_transactions = std::mem::take(&mut curr_commit_state.pending_transactions);
556 for (block_ref, pending_transactions) in pending_transactions {
557 self.context
558 .metrics
559 .node_metrics
560 .finalizer_transaction_status
561 .with_label_values(&["indirect_reject"])
562 .inc_by(pending_transactions.len() as u64);
563 curr_commit_state
564 .rejected_transactions
565 .entry(block_ref)
566 .or_default()
567 .extend(pending_transactions);
568 }
569 }
570 }
571
572 fn try_indirect_finalize_pending_transactions_in_block(
576 context: &Arc<Context>,
577 blocks: &Arc<RwLock<BTreeMap<BlockRef, RwLock<BlockState>>>>,
578 pending_block_ref: BlockRef,
579 pending_transactions: BTreeSet<TransactionIndex>,
580 ) -> Vec<TransactionIndex> {
581 if pending_transactions.is_empty() {
582 return vec![];
583 }
584 let mut accept_votes: BTreeMap<TransactionIndex, StakeAggregator<QuorumThreshold>> =
585 pending_transactions
586 .into_iter()
587 .map(|transaction_index| (transaction_index, StakeAggregator::new()))
588 .collect();
589 let mut finalized_transactions = vec![];
590 let blocks_map = blocks.read();
591 let mut to_visit_blocks = blocks_map
593 .get(&pending_block_ref)
594 .unwrap()
595 .read()
596 .children
597 .clone();
598 let mut visited = BTreeSet::new();
600 let mut ignored = BTreeSet::new();
602 while let Some(curr_block_ref) = to_visit_blocks.pop_first() {
604 if !visited.insert(curr_block_ref) {
605 continue;
606 }
607 let curr_block_state = blocks_map.get(&curr_block_ref).unwrap_or_else(|| panic!("Block {curr_block_ref} is either incorrectly gc'ed or failed to be recovered after crash.")).read();
608 if ignored.insert(curr_block_ref) {
610 ignored.extend(curr_block_state.origin_descendants.iter());
617 let curr_block_reject_votes = curr_block_state
619 .reject_votes
620 .get(&pending_block_ref)
621 .cloned()
622 .unwrap_or_default();
623 let mut newly_finalized = vec![];
625 for (index, stake) in &mut accept_votes {
626 if curr_block_reject_votes.contains(index) {
628 continue;
629 }
630 if !stake.add(curr_block_ref.author, &context.committee) {
632 continue;
633 }
634 newly_finalized.push(*index);
635 finalized_transactions.push(*index);
636 }
637 for index in newly_finalized {
639 accept_votes.remove(&index);
640 }
641 if accept_votes.is_empty() {
643 break;
644 }
645 }
646 to_visit_blocks.extend(
648 curr_block_state
649 .children
650 .iter()
651 .filter(|b| !visited.contains(*b)),
652 );
653 }
654 finalized_transactions
655 }
656
657 fn pop_finalized_commits(&mut self) -> Vec<CommittedSubDag> {
658 let mut finalized_commits = vec![];
659
660 while let Some(commit_state) = self.pending_commits.front() {
661 if !commit_state.pending_blocks.is_empty()
662 || !commit_state.pending_transactions.is_empty()
663 {
664 break;
666 }
667
668 let commit_state = self.pending_commits.pop_front().unwrap();
670 let mut commit = commit_state.commit;
671 for (block_ref, rejected_transactions) in commit_state.rejected_transactions {
672 commit
673 .rejected_transactions_by_block
674 .insert(block_ref, rejected_transactions.into_iter().collect());
675 }
676
677 let mut blocks_map = self.blocks.write();
679 for block in commit.blocks.iter() {
680 blocks_map.remove(&block.reference());
681 }
682
683 let round_delay = if let Some(last_commit_state) = self.pending_commits.back() {
684 last_commit_state.commit.leader.round - commit.leader.round
685 } else {
686 0
687 };
688 self.context
689 .metrics
690 .node_metrics
691 .finalizer_round_delay
692 .observe(round_delay as f64);
693
694 finalized_commits.push(commit);
695 }
696
697 finalized_commits
698 }
699
700 fn try_update_gc_round(&mut self, last_finalized_commit_round: Round) {
701 let gc_round = self
704 .dag_state
705 .read()
706 .calculate_gc_round(last_finalized_commit_round);
707 self.transaction_certifier.run_gc(gc_round);
708 }
709
710 #[cfg(test)]
711 fn is_empty(&self) -> bool {
712 self.pending_commits.is_empty() && self.blocks.read().is_empty()
713 }
714}
715
716struct CommitState {
717 commit: CommittedSubDag,
718 pending_blocks: BTreeMap<BlockRef, usize>,
722 pending_transactions: BTreeMap<BlockRef, BTreeSet<TransactionIndex>>,
727 rejected_transactions: BTreeMap<BlockRef, BTreeSet<TransactionIndex>>,
729}
730
731impl CommitState {
732 fn new(commit: CommittedSubDag) -> Self {
733 let pending_blocks: BTreeMap<_, _> = commit
734 .blocks
735 .iter()
736 .map(|b| (b.reference(), b.transactions().len()))
737 .collect();
738 assert!(!pending_blocks.is_empty());
739 Self {
740 commit,
741 pending_blocks,
742 pending_transactions: BTreeMap::new(),
743 rejected_transactions: BTreeMap::new(),
744 }
745 }
746
747 fn remove_pending_transactions(
748 &mut self,
749 block_ref: &BlockRef,
750 transactions: &[TransactionIndex],
751 ) {
752 let Some(block_pending_txns) = self.pending_transactions.get_mut(block_ref) else {
753 return;
754 };
755 for t in transactions {
756 block_pending_txns.remove(t);
757 }
758 if block_pending_txns.is_empty() {
759 self.pending_transactions.remove(block_ref);
760 }
761 }
762}
763
764struct BlockState {
765 block: VerifiedBlock,
767 children: BTreeSet<BlockRef>,
769 reject_votes: BTreeMap<BlockRef, BTreeSet<TransactionIndex>>,
771 origin_descendants: Vec<BlockRef>,
773}
774
775impl BlockState {
776 fn new(block: VerifiedBlock) -> Self {
777 let reject_votes: BTreeMap<_, _> = block
778 .transaction_votes()
779 .iter()
780 .map(|v| (v.block_ref, v.rejects.clone().into_iter().collect()))
781 .collect();
782 let origin_descendants = Vec::with_capacity(8);
785 Self {
786 block,
787 children: BTreeSet::new(),
788 reject_votes,
789 origin_descendants,
790 }
791 }
792}
793
794#[cfg(test)]
795mod tests {
796 use mysten_metrics::monitored_mpsc;
797 use parking_lot::RwLock;
798
799 use crate::{
800 TestBlock, VerifiedBlock, block::BlockTransactionVotes, block_verifier::NoopBlockVerifier,
801 dag_state::DagState, linearizer::Linearizer, storage::mem_store::MemStore,
802 test_dag_builder::DagBuilder,
803 };
804
805 use super::*;
806
807 struct Fixture {
808 context: Arc<Context>,
809 dag_state: Arc<RwLock<DagState>>,
810 transaction_certifier: TransactionCertifier,
811 linearizer: Linearizer,
812 commit_finalizer: CommitFinalizer,
813 }
814
815 impl Fixture {
816 fn add_blocks(&self, blocks: Vec<VerifiedBlock>) {
817 self.transaction_certifier
818 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
819 self.dag_state.write().accept_blocks(blocks);
820 }
821 }
822
823 fn create_commit_finalizer_fixture() -> Fixture {
824 let (context, _keys) = Context::new_for_test(4);
825 let context = Arc::new(context);
826 let dag_state = Arc::new(RwLock::new(DagState::new(
827 context.clone(),
828 Arc::new(MemStore::new()),
829 )));
830 let linearizer = Linearizer::new(context.clone(), dag_state.clone());
831 let (blocks_sender, _blocks_receiver) =
832 monitored_mpsc::unbounded_channel("consensus_block_output");
833 let transaction_certifier = TransactionCertifier::new(
834 context.clone(),
835 Arc::new(NoopBlockVerifier {}),
836 dag_state.clone(),
837 blocks_sender,
838 );
839 let (commit_sender, _commit_receiver) = unbounded_channel("consensus_commit_output");
840 let commit_finalizer = CommitFinalizer::new(
841 context.clone(),
842 dag_state.clone(),
843 transaction_certifier.clone(),
844 commit_sender,
845 );
846 Fixture {
847 context,
848 dag_state,
849 transaction_certifier,
850 linearizer,
851 commit_finalizer,
852 }
853 }
854
855 fn create_block(
856 round: Round,
857 authority: u32,
858 mut ancestors: Vec<BlockRef>,
859 num_transactions: usize,
860 reject_votes: Vec<BlockTransactionVotes>,
861 ) -> VerifiedBlock {
862 let i = ancestors
864 .iter()
865 .position(|b| b.author.value() == authority as usize)
866 .unwrap_or_else(|| {
867 panic!("Authority {authority} (round {round}) not found in {ancestors:?}")
868 });
869 let b = ancestors.remove(i);
870 ancestors.insert(0, b);
871 let block = TestBlock::new(round, authority)
873 .set_ancestors(ancestors)
874 .set_transactions(vec![crate::Transaction::new(vec![1; 16]); num_transactions])
875 .set_transaction_votes(reject_votes)
876 .build();
877 VerifiedBlock::new_for_test(block)
878 }
879
880 #[tokio::test]
881 async fn test_direct_finalize_no_reject_votes() {
882 let mut fixture = create_commit_finalizer_fixture();
883
884 let mut dag_builder = DagBuilder::new(fixture.context.clone());
886 dag_builder
887 .layers(1..=4)
888 .num_transactions(10)
889 .build()
890 .persist_layers(fixture.dag_state.clone());
891 let blocks = dag_builder.all_blocks();
892 fixture
893 .transaction_certifier
894 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
895
896 let leader = blocks.iter().find(|b| b.round() == 2).unwrap();
898 let committed_sub_dags = fixture.linearizer.handle_commit(vec![leader.clone()]);
899 assert_eq!(committed_sub_dags.len(), 1);
900 let committed_sub_dag = &committed_sub_dags[0];
901
902 let finalized_commits = fixture
904 .commit_finalizer
905 .process_commit(committed_sub_dag.clone())
906 .await;
907 assert_eq!(finalized_commits.len(), 1);
908 let finalized_commit = &finalized_commits[0];
909 assert_eq!(committed_sub_dag, finalized_commit);
910
911 assert!(fixture.commit_finalizer.is_empty());
913 }
914
915 #[tokio::test]
918 async fn test_direct_finalize_with_reject_votes() {
919 let mut fixture = create_commit_finalizer_fixture();
920
921 let mut dag_builder = DagBuilder::new(fixture.context.clone());
923 dag_builder
924 .layer(1)
925 .num_transactions(10)
926 .build()
927 .persist_layers(fixture.dag_state.clone());
928 let round_1_blocks = dag_builder.all_blocks();
929 fixture.transaction_certifier.add_voted_blocks(
930 round_1_blocks
931 .iter()
932 .map(|b| {
933 if b.author().value() != 3 {
934 (b.clone(), vec![])
935 } else {
936 (b.clone(), vec![0, 3])
937 }
938 })
939 .collect(),
940 );
941
942 let block_with_rejected_txn = round_1_blocks[3].clone();
944 let reject_vote = BlockTransactionVotes {
945 block_ref: block_with_rejected_txn.reference(),
946 rejects: vec![0, 3],
947 };
948
949 let ancestors: Vec<BlockRef> = round_1_blocks[0..3].iter().map(|b| b.reference()).collect();
951 let round_2_blocks = vec![
953 create_block(
954 2,
955 0,
956 round_1_blocks.iter().map(|b| b.reference()).collect(),
957 10,
958 vec![reject_vote.clone()],
959 ),
960 create_block(2, 1, ancestors.clone(), 10, vec![]),
961 create_block(2, 2, ancestors.clone(), 10, vec![]),
962 ];
963 fixture.add_blocks(round_2_blocks.clone());
964
965 let leader = round_2_blocks[0].clone();
967 let committed_sub_dags = fixture.linearizer.handle_commit(vec![leader.clone()]);
968 assert_eq!(committed_sub_dags.len(), 1);
969 let committed_sub_dag = &committed_sub_dags[0];
970 assert_eq!(committed_sub_dag.blocks.len(), 5);
971
972 let ancestors: Vec<BlockRef> = round_2_blocks.iter().map(|b| b.reference()).collect();
974 let round_3_blocks = vec![
975 create_block(3, 0, ancestors.clone(), 0, vec![]),
976 create_block(3, 1, ancestors.clone(), 0, vec![reject_vote.clone()]),
977 create_block(3, 2, ancestors.clone(), 0, vec![reject_vote.clone()]),
978 create_block(
979 3,
980 3,
981 std::iter::once(round_1_blocks[3].reference())
982 .chain(ancestors.clone())
983 .collect(),
984 0,
985 vec![reject_vote.clone()],
986 ),
987 ];
988 fixture.add_blocks(round_3_blocks.clone());
989
990 let ancestors: Vec<BlockRef> = round_3_blocks.iter().map(|b| b.reference()).collect();
992 let round_4_blocks = vec![
993 create_block(4, 0, ancestors.clone(), 0, vec![]),
994 create_block(4, 1, ancestors.clone(), 0, vec![]),
995 create_block(4, 2, ancestors.clone(), 0, vec![]),
996 create_block(4, 3, ancestors.clone(), 0, vec![]),
997 ];
998 fixture.add_blocks(round_4_blocks.clone());
999
1000 let finalized_commits = fixture
1003 .commit_finalizer
1004 .process_commit(committed_sub_dag.clone())
1005 .await;
1006 assert_eq!(finalized_commits.len(), 1);
1007 let finalized_commit = &finalized_commits[0];
1008 assert_eq!(committed_sub_dag.commit_ref, finalized_commit.commit_ref);
1009 assert_eq!(committed_sub_dag.blocks, finalized_commit.blocks);
1010 assert_eq!(finalized_commit.rejected_transactions_by_block.len(), 1);
1011 assert_eq!(
1012 finalized_commit
1013 .rejected_transactions_by_block
1014 .get(&block_with_rejected_txn.reference())
1015 .unwrap()
1016 .clone(),
1017 vec![0, 3],
1018 );
1019
1020 assert!(fixture.commit_finalizer.is_empty());
1022 }
1023
1024 #[tokio::test]
1029 async fn test_indirect_finalize_with_reject_votes() {
1030 let mut fixture = create_commit_finalizer_fixture();
1031
1032 let mut dag_builder = DagBuilder::new(fixture.context.clone());
1034 dag_builder
1035 .layer(1)
1036 .num_transactions(10)
1037 .build()
1038 .persist_layers(fixture.dag_state.clone());
1039 let round_1_blocks = dag_builder.all_blocks();
1040 fixture.transaction_certifier.add_voted_blocks(
1041 round_1_blocks
1042 .iter()
1043 .map(|b| {
1044 if b.author().value() != 3 {
1045 (b.clone(), vec![])
1046 } else {
1047 (b.clone(), vec![0, 3])
1048 }
1049 })
1050 .collect(),
1051 );
1052
1053 let block_with_rejected_txn = round_1_blocks[3].clone();
1055 let ancestors: Vec<BlockRef> = round_1_blocks[0..3].iter().map(|b| b.reference()).collect();
1062 let round_2_blocks = vec![
1064 create_block(
1065 2,
1066 0,
1067 round_1_blocks.iter().map(|b| b.reference()).collect(),
1068 10,
1069 vec![BlockTransactionVotes {
1070 block_ref: block_with_rejected_txn.reference(),
1071 rejects: vec![1, 4],
1072 }],
1073 ),
1074 create_block(2, 1, ancestors.clone(), 10, vec![]),
1076 create_block(2, 2, ancestors.clone(), 10, vec![]),
1077 ];
1078 fixture.add_blocks(round_2_blocks.clone());
1079
1080 let mut leaders = vec![round_2_blocks[0].clone()];
1082
1083 let ancestors: Vec<BlockRef> = round_2_blocks.iter().map(|b| b.reference()).collect();
1085 let round_3_blocks = vec![
1086 create_block(3, 0, ancestors.clone(), 0, vec![]),
1087 create_block(
1088 3,
1089 1,
1090 ancestors.clone(),
1091 0,
1092 vec![BlockTransactionVotes {
1093 block_ref: block_with_rejected_txn.reference(),
1094 rejects: vec![1, 4, 7],
1095 }],
1096 ),
1097 create_block(
1098 3,
1099 3,
1100 std::iter::once(round_1_blocks[3].reference())
1101 .chain(ancestors.clone())
1102 .collect(),
1103 0,
1104 vec![],
1105 ),
1106 ];
1107 fixture.add_blocks(round_3_blocks.clone());
1108 leaders.push(round_3_blocks[2].clone());
1109
1110 let ancestors: Vec<BlockRef> = round_3_blocks.iter().map(|b| b.reference()).collect();
1112 let round_4_blocks = vec![
1113 create_block(4, 0, ancestors.clone(), 0, vec![]),
1114 create_block(4, 1, ancestors.clone(), 0, vec![]),
1115 create_block(
1116 4,
1117 2,
1118 std::iter::once(round_2_blocks[2].reference())
1119 .chain(ancestors.clone())
1120 .collect(),
1121 0,
1122 vec![BlockTransactionVotes {
1123 block_ref: block_with_rejected_txn.reference(),
1124 rejects: vec![1],
1125 }],
1126 ),
1127 create_block(4, 3, ancestors.clone(), 0, vec![]),
1128 ];
1129 fixture.add_blocks(round_4_blocks.clone());
1130 leaders.push(round_4_blocks[1].clone());
1131
1132 let mut last_round_blocks = round_4_blocks.clone();
1136 for r in 5..=7 {
1137 let ancestors: Vec<BlockRef> =
1138 last_round_blocks.iter().map(|b| b.reference()).collect();
1139 let round_blocks: Vec<_> = (0..4)
1140 .map(|i| create_block(r, i, ancestors.clone(), 0, vec![]))
1141 .collect();
1142 fixture.add_blocks(round_blocks.clone());
1143 if r == 5 {
1144 leaders.push(round_blocks[0].clone());
1145 }
1146 last_round_blocks = round_blocks;
1147 }
1148
1149 assert_eq!(leaders.len(), 4);
1151 let committed_sub_dags = fixture.linearizer.handle_commit(leaders);
1152 assert_eq!(committed_sub_dags.len(), 4);
1153
1154 for commit in committed_sub_dags.iter().take(3) {
1156 let finalized_commits = fixture
1157 .commit_finalizer
1158 .process_commit(commit.clone())
1159 .await;
1160 assert_eq!(finalized_commits.len(), 0);
1161 }
1162
1163 let finalized_commits = fixture
1165 .commit_finalizer
1166 .process_commit(committed_sub_dags[3].clone())
1167 .await;
1168 assert_eq!(finalized_commits.len(), 4);
1169
1170 let rejected_transactions = finalized_commits[0].rejected_transactions_by_block.clone();
1172 assert_eq!(rejected_transactions.len(), 1);
1173 assert_eq!(
1174 rejected_transactions
1175 .get(&block_with_rejected_txn.reference())
1176 .unwrap(),
1177 &vec![1, 4]
1178 );
1179
1180 for commit in finalized_commits.iter().skip(1) {
1182 assert!(commit.rejected_transactions_by_block.is_empty());
1183 }
1184
1185 assert!(fixture.commit_finalizer.is_empty());
1187 }
1188
1189 #[tokio::test]
1190 async fn test_finalize_remote_commits_with_reject_votes() {
1191 let mut fixture: Fixture = create_commit_finalizer_fixture();
1192 let mut all_blocks = vec![];
1193
1194 let mut dag_builder = DagBuilder::new(fixture.context.clone());
1196 dag_builder.layer(1).num_transactions(10).build();
1197 let round_1_blocks = dag_builder.all_blocks();
1198 all_blocks.push(round_1_blocks.clone());
1199
1200 let mut leaders = vec![round_1_blocks[0].clone()];
1202
1203 let mut last_round_blocks = round_1_blocks.clone();
1205 for r in 2..=9 {
1206 let ancestors: Vec<BlockRef> =
1207 last_round_blocks.iter().map(|b| b.reference()).collect();
1208 let round_blocks: Vec<_> = (0..4)
1209 .map(|i| create_block(r, i, ancestors.clone(), 0, vec![]))
1210 .collect();
1211 all_blocks.push(round_blocks.clone());
1212 if r <= 7 && r != 5 {
1213 leaders.push(round_blocks[r as usize % 4].clone());
1214 }
1215 last_round_blocks = round_blocks;
1216 }
1217
1218 assert_eq!(leaders.len(), 6);
1220
1221 async fn add_blocks_and_process_commit(
1222 fixture: &mut Fixture,
1223 leaders: &[VerifiedBlock],
1224 all_blocks: &[Vec<VerifiedBlock>],
1225 index: usize,
1226 local: bool,
1227 ) -> Vec<CommittedSubDag> {
1228 let leader = leaders[index].clone();
1229 if local {
1231 for round_blocks in all_blocks.iter().take(leader.round() as usize + 2) {
1232 fixture.add_blocks(round_blocks.clone());
1233 }
1234 } else {
1235 for round_blocks in all_blocks.iter().take(leader.round() as usize) {
1236 fixture.add_blocks(round_blocks.clone());
1237 }
1238 };
1239 let mut committed_sub_dags = fixture.linearizer.handle_commit(vec![leader]);
1241 assert_eq!(committed_sub_dags.len(), 1);
1242 let mut remote_commit = committed_sub_dags.pop().unwrap();
1243 remote_commit.decided_with_local_blocks = local;
1244 fixture
1246 .commit_finalizer
1247 .process_commit(remote_commit.clone())
1248 .await
1249 }
1250
1251 for i in 0..3 {
1253 let finalized_commits =
1254 add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, i, false).await;
1255 assert!(finalized_commits.is_empty());
1256 }
1257
1258 let finalized_commits =
1260 add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, 3, false).await;
1261 assert_eq!(finalized_commits.len(), 1);
1262 assert_eq!(finalized_commits[0].commit_ref.index, 1);
1263 assert_eq!(finalized_commits[0].leader.round, 1);
1264
1265 let finalized_commits =
1267 add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, 4, true).await;
1268 assert_eq!(finalized_commits.len(), 2);
1269 assert_eq!(finalized_commits[0].commit_ref.index, 2);
1270 assert_eq!(finalized_commits[0].leader.round, 2);
1271 assert_eq!(finalized_commits[1].commit_ref.index, 3);
1272 assert_eq!(finalized_commits[1].leader.round, 3);
1273
1274 let finalized_commits =
1276 add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, 5, true).await;
1277 assert_eq!(finalized_commits.len(), 3);
1278 assert_eq!(finalized_commits[0].commit_ref.index, 4);
1279 assert_eq!(finalized_commits[0].leader.round, 4);
1280 assert_eq!(finalized_commits[1].commit_ref.index, 5);
1281 assert_eq!(finalized_commits[1].leader.round, 6);
1282 assert_eq!(finalized_commits[2].commit_ref.index, 6);
1283 assert_eq!(finalized_commits[2].leader.round, 7);
1284
1285 assert!(fixture.commit_finalizer.is_empty());
1287 }
1288}