1use std::{
5 collections::{BTreeMap, BTreeSet, VecDeque},
6 sync::Arc,
7};
8
9use consensus_config::Stake;
10use consensus_types::block::{BlockRef, Round, TransactionIndex};
11use mysten_metrics::{
12 monitored_mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
13 monitored_scope, spawn_logged_monitored_task,
14};
15use parking_lot::RwLock;
16use tokio::task::JoinSet;
17
18use crate::{
19 BlockAPI, CommitIndex, CommittedSubDag, VerifiedBlock,
20 commit::DEFAULT_WAVE_LENGTH,
21 context::Context,
22 dag_state::DagState,
23 error::{ConsensusError, ConsensusResult},
24 stake_aggregator::{QuorumThreshold, StakeAggregator},
25 transaction_certifier::TransactionCertifier,
26};
27
28pub(crate) const INDIRECT_REJECT_DEPTH: Round = 3;
32
33pub(crate) struct CommitFinalizerHandle {
35 sender: UnboundedSender<CommittedSubDag>,
36}
37
38impl CommitFinalizerHandle {
39 pub(crate) fn send(&self, commit: CommittedSubDag) -> ConsensusResult<()> {
41 self.sender.send(commit).map_err(|e| {
42 tracing::warn!("Failed to send to commit finalizer, probably due to shutdown: {e:?}");
43 ConsensusError::Shutdown
44 })
45 }
46}
47
48pub struct CommitFinalizer {
70 context: Arc<Context>,
71 dag_state: Arc<RwLock<DagState>>,
72 transaction_certifier: TransactionCertifier,
73 commit_sender: UnboundedSender<CommittedSubDag>,
74
75 last_processed_commit: Option<CommitIndex>,
77 pending_commits: VecDeque<CommitState>,
79 blocks: Arc<RwLock<BTreeMap<BlockRef, RwLock<BlockState>>>>,
81}
82
83impl CommitFinalizer {
84 pub fn new(
85 context: Arc<Context>,
86 dag_state: Arc<RwLock<DagState>>,
87 transaction_certifier: TransactionCertifier,
88 commit_sender: UnboundedSender<CommittedSubDag>,
89 ) -> Self {
90 Self {
91 context,
92 dag_state,
93 transaction_certifier,
94 commit_sender,
95 last_processed_commit: None,
96 pending_commits: VecDeque::new(),
97 blocks: Arc::new(RwLock::new(BTreeMap::new())),
98 }
99 }
100
101 pub(crate) fn start(
102 context: Arc<Context>,
103 dag_state: Arc<RwLock<DagState>>,
104 transaction_certifier: TransactionCertifier,
105 commit_sender: UnboundedSender<CommittedSubDag>,
106 ) -> CommitFinalizerHandle {
107 let processor = Self::new(context, dag_state, transaction_certifier, commit_sender);
108 let (sender, receiver) = unbounded_channel("consensus_commit_finalizer");
109 let _handle =
110 spawn_logged_monitored_task!(processor.run(receiver), "consensus_commit_finalizer");
111 CommitFinalizerHandle { sender }
112 }
113
114 async fn run(mut self, mut receiver: UnboundedReceiver<CommittedSubDag>) {
115 while let Some(committed_sub_dag) = receiver.recv().await {
116 let already_finalized = !self.context.protocol_config.mysticeti_fastpath()
117 || committed_sub_dag.recovered_rejected_transactions;
118 let finalized_commits = if !already_finalized {
119 self.process_commit(committed_sub_dag).await
120 } else {
121 vec![committed_sub_dag]
122 };
123 if !finalized_commits.is_empty() {
124 self.try_update_gc_round(finalized_commits.last().unwrap().leader.round);
128 let mut dag_state = self.dag_state.write();
129 if !already_finalized {
130 for commit in &finalized_commits {
132 dag_state.add_finalized_commit(
133 commit.commit_ref,
134 commit.rejected_transactions_by_block.clone(),
135 );
136 }
137 }
138 dag_state.flush();
143 }
144 for commit in finalized_commits {
145 if let Err(e) = self.commit_sender.send(commit) {
146 tracing::warn!(
147 "Failed to send to commit handler, probably due to shutdown: {e:?}"
148 );
149 return;
150 }
151 }
152 }
153 }
154
155 pub async fn process_commit(
156 &mut self,
157 committed_sub_dag: CommittedSubDag,
158 ) -> Vec<CommittedSubDag> {
159 let _scope = monitored_scope("CommitFinalizer::process_commit");
160
161 if let Some(last_processed_commit) = self.last_processed_commit {
162 assert_eq!(
163 last_processed_commit + 1,
164 committed_sub_dag.commit_ref.index
165 );
166 }
167 self.last_processed_commit = Some(committed_sub_dag.commit_ref.index);
168
169 self.pending_commits
170 .push_back(CommitState::new(committed_sub_dag));
171
172 let mut finalized_commits = vec![];
173
174 for i in 0..self.pending_commits.len() {
193 let commit_state = &self.pending_commits[i];
194 if commit_state.pending_blocks.is_empty() {
195 continue;
197 }
198 if !commit_state.commit.decided_with_local_blocks {
203 let last_commit_state = self.pending_commits.back().unwrap();
204 if commit_state.commit.leader.round + DEFAULT_WAVE_LENGTH
205 > last_commit_state.commit.leader.round
206 {
207 break;
208 }
209 }
210 self.try_direct_finalize_commit(i);
211 }
212 let direct_finalized_commits = self.pop_finalized_commits();
213 self.context
214 .metrics
215 .node_metrics
216 .finalizer_output_commits
217 .with_label_values(&["direct"])
218 .inc_by(direct_finalized_commits.len() as u64);
219 finalized_commits.extend(direct_finalized_commits);
220
221 if !self.pending_commits.is_empty() {
224 self.link_blocks_in_last_commit();
231 self.append_origin_descendants_from_last_commit();
232 while self.pending_commits.len() > 1 {
236 if !self.pending_commits[0].pending_blocks.is_empty() {
239 break;
240 }
241 self.try_indirect_finalize_first_commit().await;
243 let indirect_finalized_commits = self.pop_finalized_commits();
244 if indirect_finalized_commits.is_empty() {
245 break;
247 }
248 self.context
249 .metrics
250 .node_metrics
251 .finalizer_output_commits
252 .with_label_values(&["indirect"])
253 .inc_by(indirect_finalized_commits.len() as u64);
254 finalized_commits.extend(indirect_finalized_commits);
255 }
256 }
257
258 self.context
259 .metrics
260 .node_metrics
261 .finalizer_buffered_commits
262 .set(self.pending_commits.len() as i64);
263
264 finalized_commits
265 }
266
267 fn try_direct_finalize_commit(&mut self, index: usize) {
269 let num_commits = self.pending_commits.len();
270 let commit_state = self
271 .pending_commits
272 .get_mut(index)
273 .unwrap_or_else(|| panic!("Commit {} does not exist. len = {}", index, num_commits,));
274 assert!(!commit_state.pending_blocks.is_empty());
277
278 let metrics = &self.context.metrics.node_metrics;
279 let pending_blocks = std::mem::take(&mut commit_state.pending_blocks);
280 for (block_ref, num_transactions) in pending_blocks {
281 let reject_votes = self.transaction_certifier.get_reject_votes(&block_ref)
282 .unwrap_or_else(|| panic!("No vote info found for {block_ref}. It is either incorrectly gc'ed or failed to be recovered after crash."));
283 metrics
284 .finalizer_transaction_status
285 .with_label_values(&["direct_finalize"])
286 .inc_by((num_transactions - reject_votes.len()) as u64);
287 let hostname = &self.context.committee.authority(block_ref.author).hostname;
288 metrics
289 .finalizer_reject_votes
290 .with_label_values(&[hostname])
291 .inc_by(reject_votes.len() as u64);
292 for (transaction_index, stake) in reject_votes {
295 let entry = if stake < self.context.committee.quorum_threshold() {
298 commit_state
299 .pending_transactions
300 .entry(block_ref)
301 .or_default()
302 } else {
303 metrics
304 .finalizer_transaction_status
305 .with_label_values(&["direct_reject"])
306 .inc();
307 commit_state
308 .rejected_transactions
309 .entry(block_ref)
310 .or_default()
311 };
312 entry.insert(transaction_index);
313 }
314 }
315 }
316
317 fn link_blocks_in_last_commit(&mut self) {
320 let commit_state = self
321 .pending_commits
322 .back_mut()
323 .unwrap_or_else(|| panic!("No pending commit."));
324
325 let mut blocks = commit_state.commit.blocks.clone();
328 blocks.sort_by_key(|b| b.round());
329
330 let mut blocks_map = self.blocks.write();
331 for block in blocks {
332 let block_ref = block.reference();
333 for ancestor in block.ancestors() {
335 if let Some(ancestor_block) = blocks_map.get(ancestor) {
338 ancestor_block.write().children.insert(block_ref);
339 }
340 }
341 blocks_map.entry(block_ref).or_insert_with(|| {
343 RwLock::new(BlockState::new(block, commit_state.commit.commit_ref.index))
344 });
345 }
346 }
347
348 fn append_origin_descendants_from_last_commit(&mut self) {
369 let commit_state = self
370 .pending_commits
371 .back_mut()
372 .unwrap_or_else(|| panic!("No pending commit."));
373 let mut committed_blocks = commit_state.commit.blocks.clone();
374 committed_blocks.sort_by_key(|b| b.round());
375 let blocks_map = self.blocks.read();
376 for committed_block in committed_blocks {
377 let committed_block_ref = committed_block.reference();
378 let mut origin_ancestor_ref = *blocks_map
382 .get(&committed_block_ref)
383 .unwrap()
384 .read()
385 .block
386 .ancestors()
387 .first()
388 .unwrap();
389 while origin_ancestor_ref.author == committed_block_ref.author {
390 let Some(origin_ancestor_block) = blocks_map.get(&origin_ancestor_ref) else {
391 break;
392 };
393 origin_ancestor_block
394 .write()
395 .origin_descendants
396 .push(committed_block_ref);
397 origin_ancestor_ref = *origin_ancestor_block
398 .read()
399 .block
400 .ancestors()
401 .first()
402 .unwrap();
403 }
404 }
405 }
406
407 async fn try_indirect_finalize_first_commit(&mut self) {
409 assert!(!self.pending_commits.is_empty());
411 assert!(self.pending_commits[0].pending_blocks.is_empty());
412
413 self.check_pending_transactions_in_first_commit();
415
416 self.try_indirect_finalize_pending_transactions_in_first_commit()
418 .await;
419
420 self.try_indirect_reject_pending_transactions_in_first_commit();
422 }
423
424 fn check_pending_transactions_in_first_commit(&mut self) {
425 let mut all_rejected_transactions: Vec<(BlockRef, Vec<TransactionIndex>)> = vec![];
426
427 for (block_ref, pending_transactions) in &self.pending_commits[0].pending_transactions {
429 let reject_votes: BTreeMap<TransactionIndex, Stake> = self
430 .transaction_certifier
431 .get_reject_votes(block_ref)
432 .unwrap_or_else(|| panic!("No vote info found for {block_ref}. It is incorrectly gc'ed or failed to be recovered after crash."))
433 .into_iter()
434 .collect();
435 let mut rejected_transactions = vec![];
436 for &transaction_index in pending_transactions {
437 let reject_stake = reject_votes.get(&transaction_index).copied().unwrap();
439 if reject_stake < self.context.committee.quorum_threshold() {
440 continue;
442 }
443 rejected_transactions.push(transaction_index);
445 }
446 if !rejected_transactions.is_empty() {
447 all_rejected_transactions.push((*block_ref, rejected_transactions));
448 }
449 }
450
451 for (block_ref, rejected_transactions) in all_rejected_transactions {
453 self.context
454 .metrics
455 .node_metrics
456 .finalizer_transaction_status
457 .with_label_values(&["direct_late_reject"])
458 .inc_by(rejected_transactions.len() as u64);
459 let curr_commit_state = &mut self.pending_commits[0];
460 curr_commit_state.remove_pending_transactions(&block_ref, &rejected_transactions);
461 curr_commit_state
462 .rejected_transactions
463 .entry(block_ref)
464 .or_default()
465 .extend(rejected_transactions);
466 }
467 }
468
469 async fn try_indirect_finalize_pending_transactions_in_first_commit(&mut self) {
470 let _scope = monitored_scope(
471 "CommitFinalizer::try_indirect_finalize_pending_transactions_in_first_commit",
472 );
473
474 let pending_blocks: Vec<_> = self.pending_commits[0]
475 .pending_transactions
476 .iter()
477 .map(|(k, v)| (*k, v.clone()))
478 .collect();
479
480 let gc_rounds = self
481 .pending_commits
482 .iter()
483 .map(|c| {
484 (
485 c.commit.commit_ref.index,
486 self.dag_state
487 .read()
488 .calculate_gc_round(c.commit.leader.round),
489 )
490 })
491 .collect::<Vec<_>>();
492
493 const BLOCKS_PER_INDIRECT_COMMIT_TASK: usize = 8;
495
496 let mut all_finalized_transactions = vec![];
498 let mut join_set = JoinSet::new();
499 for chunk in pending_blocks.chunks(BLOCKS_PER_INDIRECT_COMMIT_TASK) {
502 let context = self.context.clone();
503 let blocks = self.blocks.clone();
504 let gc_rounds = gc_rounds.clone();
505 let chunk: Vec<(BlockRef, BTreeSet<TransactionIndex>)> = chunk.to_vec();
506
507 join_set.spawn(tokio::task::spawn_blocking(move || {
508 let mut chunk_results = Vec::new();
509
510 for (block_ref, pending_transactions) in chunk {
511 let finalized = Self::try_indirect_finalize_pending_transactions_in_block(
512 &context,
513 &blocks,
514 &gc_rounds,
515 block_ref,
516 pending_transactions,
517 );
518
519 if !finalized.is_empty() {
520 chunk_results.push((block_ref, finalized));
521 }
522 }
523
524 chunk_results
525 }));
526 }
527
528 while let Some(result) = join_set.join_next().await {
530 let e = match result {
531 Ok(blocking_result) => match blocking_result {
532 Ok(chunk_results) => {
533 all_finalized_transactions.extend(chunk_results);
534 continue;
535 }
536 Err(e) => e,
537 },
538 Err(e) => e,
539 };
540 if e.is_panic() {
541 std::panic::resume_unwind(e.into_panic());
542 }
543 tracing::info!("Process likely shutting down: {:?}", e);
544 return;
546 }
547
548 for (block_ref, finalized_transactions) in all_finalized_transactions {
549 self.context
550 .metrics
551 .node_metrics
552 .finalizer_transaction_status
553 .with_label_values(&["indirect_finalize"])
554 .inc_by(finalized_transactions.len() as u64);
555 self.pending_commits[0]
557 .remove_pending_transactions(&block_ref, &finalized_transactions);
558 }
559 }
560
561 fn try_indirect_reject_pending_transactions_in_first_commit(&mut self) {
562 let curr_leader_round = self.pending_commits[0].commit.leader.round;
563 let last_commit_leader_round = self.pending_commits.back().unwrap().commit.leader.round;
564 if curr_leader_round + INDIRECT_REJECT_DEPTH <= last_commit_leader_round {
565 let curr_commit_state = &mut self.pending_commits[0];
566 assert!(curr_commit_state.pending_blocks.is_empty());
570 let pending_transactions = std::mem::take(&mut curr_commit_state.pending_transactions);
574 for (block_ref, pending_transactions) in pending_transactions {
575 self.context
576 .metrics
577 .node_metrics
578 .finalizer_transaction_status
579 .with_label_values(&["indirect_reject"])
580 .inc_by(pending_transactions.len() as u64);
581 curr_commit_state
582 .rejected_transactions
583 .entry(block_ref)
584 .or_default()
585 .extend(pending_transactions);
586 }
587 }
588 }
589
590 fn try_indirect_finalize_pending_transactions_in_block(
594 context: &Arc<Context>,
595 blocks: &Arc<RwLock<BTreeMap<BlockRef, RwLock<BlockState>>>>,
596 gc_rounds: &[(CommitIndex, Round)],
597 pending_block_ref: BlockRef,
598 pending_transactions: BTreeSet<TransactionIndex>,
599 ) -> Vec<TransactionIndex> {
600 if pending_transactions.is_empty() {
601 return vec![];
602 }
603 let mut accept_votes: BTreeMap<TransactionIndex, StakeAggregator<QuorumThreshold>> =
604 pending_transactions
605 .into_iter()
606 .map(|transaction_index| (transaction_index, StakeAggregator::new()))
607 .collect();
608 let mut finalized_transactions = vec![];
609 let blocks_map = blocks.read();
610 let (pending_commit_index, mut to_visit_blocks) = {
612 let block_state = blocks_map.get(&pending_block_ref).unwrap().read();
613 (block_state.commit_index, block_state.children.clone())
614 };
615 let mut visited = BTreeSet::new();
617 let mut ignored = BTreeSet::new();
619 while let Some(curr_block_ref) = to_visit_blocks.pop_first() {
621 if !visited.insert(curr_block_ref) {
622 continue;
623 }
624 let curr_block_state = blocks_map.get(&curr_block_ref).unwrap_or_else(|| panic!("Block {curr_block_ref} is either incorrectly gc'ed or failed to be recovered after crash.")).read();
625 let votes_gced = Self::gced_transaction_votes_for_pending_block(
632 gc_rounds,
633 pending_block_ref.round,
634 pending_commit_index,
635 curr_block_state.commit_index,
636 );
637 if ignored.insert(curr_block_ref) {
639 ignored.extend(curr_block_state.origin_descendants.iter());
648 if context.protocol_config.consensus_skip_gced_accept_votes() && votes_gced {
654 let hostname = &context.committee.authority(curr_block_ref.author).hostname;
655 context
656 .metrics
657 .node_metrics
658 .finalizer_skipped_voting_blocks
659 .with_label_values(&[hostname])
660 .inc();
661 continue;
662 }
663 let curr_block_reject_votes = curr_block_state
665 .reject_votes
666 .get(&pending_block_ref)
667 .cloned()
668 .unwrap_or_default();
669 let mut newly_finalized = vec![];
671 for (index, stake) in &mut accept_votes {
672 if curr_block_reject_votes.contains(index) {
674 continue;
675 }
676 if !stake.add(curr_block_ref.author, &context.committee) {
678 continue;
679 }
680 newly_finalized.push(*index);
681 finalized_transactions.push(*index);
682 }
683 for index in newly_finalized {
685 accept_votes.remove(&index);
686 }
687 if accept_votes.is_empty() {
689 break;
690 }
691 }
692 to_visit_blocks.extend(
694 curr_block_state
695 .children
696 .iter()
697 .filter(|b| !visited.contains(*b)),
698 );
699 }
700 finalized_transactions
701 }
702
703 fn gced_transaction_votes_for_pending_block(
721 gc_rounds: &[(CommitIndex, Round)],
722 pending_block_round: Round,
723 pending_commit_index: CommitIndex,
724 current_commit_index: CommitIndex,
725 ) -> bool {
726 assert!(
727 pending_commit_index <= current_commit_index,
728 "Pending {pending_commit_index} should be <= current {current_commit_index}"
729 );
730 if pending_commit_index == current_commit_index {
731 return false;
732 }
733 let (commit_index, gc_round) = *gc_rounds
737 .get((current_commit_index - 1 - pending_commit_index) as usize)
738 .unwrap();
739 assert_eq!(
740 commit_index,
741 current_commit_index - 1,
742 "Commit index mismatch {commit_index} != {current_commit_index}"
743 );
744 pending_block_round <= gc_round
745 }
746
747 fn pop_finalized_commits(&mut self) -> Vec<CommittedSubDag> {
748 let mut finalized_commits = vec![];
749
750 while let Some(commit_state) = self.pending_commits.front() {
751 if !commit_state.pending_blocks.is_empty()
752 || !commit_state.pending_transactions.is_empty()
753 {
754 break;
756 }
757
758 let commit_state = self.pending_commits.pop_front().unwrap();
760 let mut commit = commit_state.commit;
761 for (block_ref, rejected_transactions) in commit_state.rejected_transactions {
762 commit
763 .rejected_transactions_by_block
764 .insert(block_ref, rejected_transactions.into_iter().collect());
765 }
766
767 let mut blocks_map = self.blocks.write();
769 for block in commit.blocks.iter() {
770 blocks_map.remove(&block.reference());
771 }
772
773 let round_delay = if let Some(last_commit_state) = self.pending_commits.back() {
774 last_commit_state.commit.leader.round - commit.leader.round
775 } else {
776 0
777 };
778 self.context
779 .metrics
780 .node_metrics
781 .finalizer_round_delay
782 .observe(round_delay as f64);
783
784 finalized_commits.push(commit);
785 }
786
787 finalized_commits
788 }
789
790 fn try_update_gc_round(&mut self, last_finalized_commit_round: Round) {
791 let gc_round = self
794 .dag_state
795 .read()
796 .calculate_gc_round(last_finalized_commit_round);
797 self.transaction_certifier.run_gc(gc_round);
798 }
799
800 #[cfg(test)]
801 fn is_empty(&self) -> bool {
802 self.pending_commits.is_empty() && self.blocks.read().is_empty()
803 }
804}
805
806struct CommitState {
807 commit: CommittedSubDag,
808 pending_blocks: BTreeMap<BlockRef, usize>,
812 pending_transactions: BTreeMap<BlockRef, BTreeSet<TransactionIndex>>,
817 rejected_transactions: BTreeMap<BlockRef, BTreeSet<TransactionIndex>>,
819}
820
821impl CommitState {
822 fn new(commit: CommittedSubDag) -> Self {
823 let pending_blocks: BTreeMap<_, _> = commit
824 .blocks
825 .iter()
826 .map(|b| (b.reference(), b.transactions().len()))
827 .collect();
828 assert!(!pending_blocks.is_empty());
829 Self {
830 commit,
831 pending_blocks,
832 pending_transactions: BTreeMap::new(),
833 rejected_transactions: BTreeMap::new(),
834 }
835 }
836
837 fn remove_pending_transactions(
838 &mut self,
839 block_ref: &BlockRef,
840 transactions: &[TransactionIndex],
841 ) {
842 let Some(block_pending_txns) = self.pending_transactions.get_mut(block_ref) else {
843 return;
844 };
845 for t in transactions {
846 block_pending_txns.remove(t);
847 }
848 if block_pending_txns.is_empty() {
849 self.pending_transactions.remove(block_ref);
850 }
851 }
852}
853
854struct BlockState {
855 block: VerifiedBlock,
857 children: BTreeSet<BlockRef>,
859 reject_votes: BTreeMap<BlockRef, BTreeSet<TransactionIndex>>,
861 origin_descendants: Vec<BlockRef>,
864 commit_index: CommitIndex,
866}
867
868impl BlockState {
869 fn new(block: VerifiedBlock, commit_index: CommitIndex) -> Self {
870 let reject_votes: BTreeMap<_, _> = block
871 .transaction_votes()
872 .iter()
873 .map(|v| (v.block_ref, v.rejects.clone().into_iter().collect()))
874 .collect();
875 let origin_descendants = Vec::with_capacity(8);
878 Self {
879 block,
880 children: BTreeSet::new(),
881 reject_votes,
882 origin_descendants,
883 commit_index,
884 }
885 }
886}
887
888#[cfg(test)]
889mod tests {
890 use mysten_metrics::monitored_mpsc;
891 use parking_lot::RwLock;
892
893 use crate::{
894 TestBlock, VerifiedBlock, block::BlockTransactionVotes, block_verifier::NoopBlockVerifier,
895 dag_state::DagState, linearizer::Linearizer, storage::mem_store::MemStore,
896 test_dag_builder::DagBuilder,
897 };
898
899 use super::*;
900
901 struct Fixture {
902 context: Arc<Context>,
903 dag_state: Arc<RwLock<DagState>>,
904 transaction_certifier: TransactionCertifier,
905 linearizer: Linearizer,
906 commit_finalizer: CommitFinalizer,
907 }
908
909 impl Fixture {
910 fn add_blocks(&self, blocks: Vec<VerifiedBlock>) {
911 self.transaction_certifier
912 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
913 self.dag_state.write().accept_blocks(blocks);
914 }
915 }
916
917 fn create_commit_finalizer_fixture() -> Fixture {
918 let (mut context, _keys) = Context::new_for_test(4);
919 context
920 .protocol_config
921 .set_consensus_gc_depth_for_testing(5);
922 context
923 .protocol_config
924 .set_consensus_skip_gced_accept_votes_for_testing(true);
925 let context = Arc::new(context);
926 let dag_state = Arc::new(RwLock::new(DagState::new(
927 context.clone(),
928 Arc::new(MemStore::new()),
929 )));
930 let linearizer = Linearizer::new(context.clone(), dag_state.clone());
931 let (blocks_sender, _blocks_receiver) =
932 monitored_mpsc::unbounded_channel("consensus_block_output");
933 let transaction_certifier = TransactionCertifier::new(
934 context.clone(),
935 Arc::new(NoopBlockVerifier {}),
936 dag_state.clone(),
937 blocks_sender,
938 );
939 let (commit_sender, _commit_receiver) = unbounded_channel("consensus_commit_output");
940 let commit_finalizer = CommitFinalizer::new(
941 context.clone(),
942 dag_state.clone(),
943 transaction_certifier.clone(),
944 commit_sender,
945 );
946 Fixture {
947 context,
948 dag_state,
949 transaction_certifier,
950 linearizer,
951 commit_finalizer,
952 }
953 }
954
955 fn create_block(
956 round: Round,
957 authority: u32,
958 mut ancestors: Vec<BlockRef>,
959 num_transactions: usize,
960 reject_votes: Vec<BlockTransactionVotes>,
961 ) -> VerifiedBlock {
962 let i = ancestors
964 .iter()
965 .position(|b| b.author.value() == authority as usize)
966 .unwrap_or_else(|| {
967 panic!("Authority {authority} (round {round}) not found in {ancestors:?}")
968 });
969 let b = ancestors.remove(i);
970 ancestors.insert(0, b);
971 let block = TestBlock::new(round, authority)
973 .set_ancestors(ancestors)
974 .set_transactions(vec![crate::Transaction::new(vec![1; 16]); num_transactions])
975 .set_transaction_votes(reject_votes)
976 .build();
977 VerifiedBlock::new_for_test(block)
978 }
979
980 #[tokio::test]
981 async fn test_direct_finalize_no_reject_votes() {
982 let mut fixture = create_commit_finalizer_fixture();
983
984 let mut dag_builder = DagBuilder::new(fixture.context.clone());
986 dag_builder
987 .layers(1..=4)
988 .num_transactions(10)
989 .build()
990 .persist_layers(fixture.dag_state.clone());
991 let blocks = dag_builder.all_blocks();
992 fixture
993 .transaction_certifier
994 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
995
996 let leader = blocks.iter().find(|b| b.round() == 2).unwrap();
998 let committed_sub_dags = fixture.linearizer.handle_commit(vec![leader.clone()]);
999 assert_eq!(committed_sub_dags.len(), 1);
1000 let committed_sub_dag = &committed_sub_dags[0];
1001
1002 let finalized_commits = fixture
1004 .commit_finalizer
1005 .process_commit(committed_sub_dag.clone())
1006 .await;
1007 assert_eq!(finalized_commits.len(), 1);
1008 let finalized_commit = &finalized_commits[0];
1009 assert_eq!(committed_sub_dag, finalized_commit);
1010
1011 assert!(fixture.commit_finalizer.is_empty());
1013 }
1014
1015 #[tokio::test]
1018 async fn test_direct_finalize_with_reject_votes() {
1019 let mut fixture = create_commit_finalizer_fixture();
1020
1021 let mut dag_builder = DagBuilder::new(fixture.context.clone());
1023 dag_builder
1024 .layer(1)
1025 .num_transactions(10)
1026 .build()
1027 .persist_layers(fixture.dag_state.clone());
1028 let round_1_blocks = dag_builder.all_blocks();
1029 fixture.transaction_certifier.add_voted_blocks(
1030 round_1_blocks
1031 .iter()
1032 .map(|b| {
1033 if b.author().value() != 3 {
1034 (b.clone(), vec![])
1035 } else {
1036 (b.clone(), vec![0, 3])
1037 }
1038 })
1039 .collect(),
1040 );
1041
1042 let block_with_rejected_txn = round_1_blocks[3].clone();
1044 let reject_vote = BlockTransactionVotes {
1045 block_ref: block_with_rejected_txn.reference(),
1046 rejects: vec![0, 3],
1047 };
1048
1049 let ancestors: Vec<BlockRef> = round_1_blocks[0..3].iter().map(|b| b.reference()).collect();
1051 let round_2_blocks = vec![
1053 create_block(
1054 2,
1055 0,
1056 round_1_blocks.iter().map(|b| b.reference()).collect(),
1057 10,
1058 vec![reject_vote.clone()],
1059 ),
1060 create_block(2, 1, ancestors.clone(), 10, vec![]),
1061 create_block(2, 2, ancestors.clone(), 10, vec![]),
1062 ];
1063 fixture.add_blocks(round_2_blocks.clone());
1064
1065 let leader = round_2_blocks[0].clone();
1067 let committed_sub_dags = fixture.linearizer.handle_commit(vec![leader.clone()]);
1068 assert_eq!(committed_sub_dags.len(), 1);
1069 let committed_sub_dag = &committed_sub_dags[0];
1070 assert_eq!(committed_sub_dag.blocks.len(), 5);
1071
1072 let ancestors: Vec<BlockRef> = round_2_blocks.iter().map(|b| b.reference()).collect();
1074 let round_3_blocks = vec![
1075 create_block(3, 0, ancestors.clone(), 0, vec![]),
1076 create_block(3, 1, ancestors.clone(), 0, vec![reject_vote.clone()]),
1077 create_block(3, 2, ancestors.clone(), 0, vec![reject_vote.clone()]),
1078 create_block(
1079 3,
1080 3,
1081 std::iter::once(round_1_blocks[3].reference())
1082 .chain(ancestors.clone())
1083 .collect(),
1084 0,
1085 vec![reject_vote.clone()],
1086 ),
1087 ];
1088 fixture.add_blocks(round_3_blocks.clone());
1089
1090 let ancestors: Vec<BlockRef> = round_3_blocks.iter().map(|b| b.reference()).collect();
1092 let round_4_blocks = vec![
1093 create_block(4, 0, ancestors.clone(), 0, vec![]),
1094 create_block(4, 1, ancestors.clone(), 0, vec![]),
1095 create_block(4, 2, ancestors.clone(), 0, vec![]),
1096 create_block(4, 3, ancestors.clone(), 0, vec![]),
1097 ];
1098 fixture.add_blocks(round_4_blocks.clone());
1099
1100 let finalized_commits = fixture
1103 .commit_finalizer
1104 .process_commit(committed_sub_dag.clone())
1105 .await;
1106 assert_eq!(finalized_commits.len(), 1);
1107 let finalized_commit = &finalized_commits[0];
1108 assert_eq!(committed_sub_dag.commit_ref, finalized_commit.commit_ref);
1109 assert_eq!(committed_sub_dag.blocks, finalized_commit.blocks);
1110 assert_eq!(finalized_commit.rejected_transactions_by_block.len(), 1);
1111 assert_eq!(
1112 finalized_commit
1113 .rejected_transactions_by_block
1114 .get(&block_with_rejected_txn.reference())
1115 .unwrap()
1116 .clone(),
1117 vec![0, 3],
1118 );
1119
1120 assert!(fixture.commit_finalizer.is_empty());
1122 }
1123
1124 #[tokio::test]
1129 async fn test_indirect_finalize_with_reject_votes() {
1130 let mut fixture = create_commit_finalizer_fixture();
1131
1132 let mut dag_builder = DagBuilder::new(fixture.context.clone());
1134 dag_builder
1135 .layer(1)
1136 .num_transactions(10)
1137 .build()
1138 .persist_layers(fixture.dag_state.clone());
1139 let round_1_blocks = dag_builder.all_blocks();
1140 fixture.transaction_certifier.add_voted_blocks(
1141 round_1_blocks
1142 .iter()
1143 .map(|b| {
1144 if b.author().value() != 3 {
1145 (b.clone(), vec![])
1146 } else {
1147 (b.clone(), vec![0, 3])
1148 }
1149 })
1150 .collect(),
1151 );
1152
1153 let block_with_rejected_txn = round_1_blocks[3].clone();
1155 let ancestors: Vec<BlockRef> = round_1_blocks[0..3].iter().map(|b| b.reference()).collect();
1162 let round_2_blocks = vec![
1164 create_block(
1165 2,
1166 0,
1167 round_1_blocks.iter().map(|b| b.reference()).collect(),
1168 10,
1169 vec![BlockTransactionVotes {
1170 block_ref: block_with_rejected_txn.reference(),
1171 rejects: vec![1, 4],
1172 }],
1173 ),
1174 create_block(2, 1, ancestors.clone(), 10, vec![]),
1176 create_block(2, 2, ancestors.clone(), 10, vec![]),
1177 ];
1178 fixture.add_blocks(round_2_blocks.clone());
1179
1180 let mut leaders = vec![round_2_blocks[0].clone()];
1182
1183 let ancestors: Vec<BlockRef> = round_2_blocks.iter().map(|b| b.reference()).collect();
1185 let round_3_blocks = vec![
1186 create_block(3, 0, ancestors.clone(), 0, vec![]),
1187 create_block(
1188 3,
1189 1,
1190 ancestors.clone(),
1191 0,
1192 vec![BlockTransactionVotes {
1193 block_ref: block_with_rejected_txn.reference(),
1194 rejects: vec![1, 4, 7],
1195 }],
1196 ),
1197 create_block(
1198 3,
1199 3,
1200 std::iter::once(round_1_blocks[3].reference())
1201 .chain(ancestors.clone())
1202 .collect(),
1203 0,
1204 vec![],
1205 ),
1206 ];
1207 fixture.add_blocks(round_3_blocks.clone());
1208 leaders.push(round_3_blocks[2].clone());
1209
1210 let ancestors: Vec<BlockRef> = round_3_blocks.iter().map(|b| b.reference()).collect();
1212 let round_4_blocks = vec![
1213 create_block(4, 0, ancestors.clone(), 0, vec![]),
1214 create_block(4, 1, ancestors.clone(), 0, vec![]),
1215 create_block(
1216 4,
1217 2,
1218 std::iter::once(round_2_blocks[2].reference())
1219 .chain(ancestors.clone())
1220 .collect(),
1221 0,
1222 vec![BlockTransactionVotes {
1223 block_ref: block_with_rejected_txn.reference(),
1224 rejects: vec![1],
1225 }],
1226 ),
1227 create_block(4, 3, ancestors.clone(), 0, vec![]),
1228 ];
1229 fixture.add_blocks(round_4_blocks.clone());
1230 leaders.push(round_4_blocks[1].clone());
1231
1232 let mut last_round_blocks = round_4_blocks.clone();
1236 for r in 5..=7 {
1237 let ancestors: Vec<BlockRef> =
1238 last_round_blocks.iter().map(|b| b.reference()).collect();
1239 let round_blocks: Vec<_> = (0..4)
1240 .map(|i| create_block(r, i, ancestors.clone(), 0, vec![]))
1241 .collect();
1242 fixture.add_blocks(round_blocks.clone());
1243 if r == 5 {
1244 leaders.push(round_blocks[0].clone());
1245 }
1246 last_round_blocks = round_blocks;
1247 }
1248
1249 assert_eq!(leaders.len(), 4);
1251 let committed_sub_dags = fixture.linearizer.handle_commit(leaders);
1252 assert_eq!(committed_sub_dags.len(), 4);
1253
1254 for commit in committed_sub_dags.iter().take(3) {
1256 let finalized_commits = fixture
1257 .commit_finalizer
1258 .process_commit(commit.clone())
1259 .await;
1260 assert_eq!(finalized_commits.len(), 0);
1261 }
1262
1263 let finalized_commits = fixture
1265 .commit_finalizer
1266 .process_commit(committed_sub_dags[3].clone())
1267 .await;
1268 assert_eq!(finalized_commits.len(), 4);
1269
1270 let rejected_transactions = finalized_commits[0].rejected_transactions_by_block.clone();
1272 assert_eq!(rejected_transactions.len(), 1);
1273 assert_eq!(
1274 rejected_transactions
1275 .get(&block_with_rejected_txn.reference())
1276 .unwrap(),
1277 &vec![1, 4]
1278 );
1279
1280 for commit in finalized_commits.iter().skip(1) {
1282 assert!(commit.rejected_transactions_by_block.is_empty());
1283 }
1284
1285 assert!(fixture.commit_finalizer.is_empty());
1287 }
1288
1289 #[tokio::test]
1291 async fn test_indirect_reject_with_gc() {
1292 let mut fixture = create_commit_finalizer_fixture();
1293 assert_eq!(fixture.context.protocol_config.consensus_gc_depth(), 5);
1294
1295 let mut dag_builder = DagBuilder::new(fixture.context.clone());
1297 dag_builder
1298 .layer(1)
1299 .num_transactions(10)
1300 .build()
1301 .persist_layers(fixture.dag_state.clone());
1302 let round_1_blocks = dag_builder.all_blocks();
1303 fixture
1304 .transaction_certifier
1305 .add_voted_blocks(round_1_blocks.iter().map(|b| (b.clone(), vec![])).collect());
1306
1307 let block_with_rejected_txn = round_1_blocks[3].clone();
1309 let ancestors: Vec<BlockRef> = round_1_blocks.iter().map(|b| b.reference()).collect();
1316 let round_2_blocks = vec![
1317 create_block(2, 0, ancestors.clone(), 0, vec![]),
1318 create_block(
1319 2,
1320 1,
1321 ancestors.clone(),
1322 0,
1323 vec![BlockTransactionVotes {
1324 block_ref: block_with_rejected_txn.reference(),
1325 rejects: vec![1],
1326 }],
1327 ),
1328 create_block(2, 2, ancestors.clone(), 0, vec![]),
1329 create_block(2, 3, ancestors.clone(), 0, vec![]),
1330 ];
1331 fixture.add_blocks(round_2_blocks.clone());
1332
1333 let mut last_round_blocks: Vec<VerifiedBlock> = round_2_blocks
1336 .iter()
1337 .enumerate()
1338 .filter_map(|(i, b)| if i != 2 { Some(b.clone()) } else { None })
1339 .collect();
1340 for r in 3..=6 {
1341 let ancestors: Vec<BlockRef> =
1342 last_round_blocks.iter().map(|b| b.reference()).collect();
1343 last_round_blocks = [0, 1, 3]
1344 .map(|i| create_block(r, i, ancestors.clone(), 0, vec![]))
1345 .to_vec();
1346 fixture.add_blocks(last_round_blocks.clone());
1347 }
1348
1349 let mut leaders = vec![];
1351 for r in 7..=10 {
1352 let mut ancestors: Vec<BlockRef> =
1353 last_round_blocks.iter().map(|b| b.reference()).collect();
1354 last_round_blocks = (0..4)
1355 .map(|i| {
1356 if r == 7 && i == 2 {
1357 ancestors.push(round_2_blocks[2].reference());
1359 }
1360 create_block(r, i, ancestors.clone(), 0, vec![])
1361 })
1362 .collect();
1363 leaders.push(last_round_blocks[0].clone());
1364 fixture.add_blocks(last_round_blocks.clone());
1365 }
1366
1367 assert_eq!(leaders.len(), 4);
1369 let committed_sub_dags = fixture.linearizer.handle_commit(leaders);
1370 assert_eq!(committed_sub_dags.len(), 4);
1371
1372 assert!(committed_sub_dags[0].blocks.contains(&round_2_blocks[1]));
1374 for commit in committed_sub_dags.iter() {
1376 assert!(!commit.blocks.contains(&round_2_blocks[2]));
1377 }
1378
1379 for commit in committed_sub_dags.iter().take(3) {
1381 assert!(commit.decided_with_local_blocks);
1382 let finalized_commits = fixture
1383 .commit_finalizer
1384 .process_commit(commit.clone())
1385 .await;
1386 assert_eq!(finalized_commits.len(), 0);
1387 }
1388
1389 let finalized_commits = fixture
1391 .commit_finalizer
1392 .process_commit(committed_sub_dags[3].clone())
1393 .await;
1394 assert_eq!(finalized_commits.len(), 4);
1395
1396 let rejected_transactions = finalized_commits[0].rejected_transactions_by_block.clone();
1400 assert_eq!(rejected_transactions.len(), 1);
1401 assert_eq!(
1402 rejected_transactions
1403 .get(&block_with_rejected_txn.reference())
1404 .unwrap(),
1405 &vec![1]
1406 );
1407
1408 for commit in finalized_commits.iter().skip(1) {
1410 assert!(commit.rejected_transactions_by_block.is_empty());
1411 }
1412
1413 assert!(fixture.commit_finalizer.is_empty());
1415 }
1416
1417 #[tokio::test]
1418 async fn test_finalize_remote_commits_with_reject_votes() {
1419 let mut fixture: Fixture = create_commit_finalizer_fixture();
1420 let mut all_blocks = vec![];
1421
1422 let mut dag_builder = DagBuilder::new(fixture.context.clone());
1424 dag_builder.layer(1).num_transactions(10).build();
1425 let round_1_blocks = dag_builder.all_blocks();
1426 all_blocks.push(round_1_blocks.clone());
1427
1428 let mut leaders = vec![round_1_blocks[0].clone()];
1430
1431 let mut last_round_blocks = round_1_blocks.clone();
1433 for r in 2..=9 {
1434 let ancestors: Vec<BlockRef> =
1435 last_round_blocks.iter().map(|b| b.reference()).collect();
1436 let round_blocks: Vec<_> = (0..4)
1437 .map(|i| create_block(r, i, ancestors.clone(), 0, vec![]))
1438 .collect();
1439 all_blocks.push(round_blocks.clone());
1440 if r <= 7 && r != 5 {
1441 leaders.push(round_blocks[r as usize % 4].clone());
1442 }
1443 last_round_blocks = round_blocks;
1444 }
1445
1446 assert_eq!(leaders.len(), 6);
1448
1449 async fn add_blocks_and_process_commit(
1450 fixture: &mut Fixture,
1451 leaders: &[VerifiedBlock],
1452 all_blocks: &[Vec<VerifiedBlock>],
1453 index: usize,
1454 local: bool,
1455 ) -> Vec<CommittedSubDag> {
1456 let leader = leaders[index].clone();
1457 if local {
1459 for round_blocks in all_blocks.iter().take(leader.round() as usize + 2) {
1460 fixture.add_blocks(round_blocks.clone());
1461 }
1462 } else {
1463 for round_blocks in all_blocks.iter().take(leader.round() as usize) {
1464 fixture.add_blocks(round_blocks.clone());
1465 }
1466 };
1467 let mut committed_sub_dags = fixture.linearizer.handle_commit(vec![leader]);
1469 assert_eq!(committed_sub_dags.len(), 1);
1470 let mut remote_commit = committed_sub_dags.pop().unwrap();
1471 remote_commit.decided_with_local_blocks = local;
1472 fixture
1474 .commit_finalizer
1475 .process_commit(remote_commit.clone())
1476 .await
1477 }
1478
1479 for i in 0..3 {
1481 let finalized_commits =
1482 add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, i, false).await;
1483 assert!(finalized_commits.is_empty());
1484 }
1485
1486 let finalized_commits =
1488 add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, 3, false).await;
1489 assert_eq!(finalized_commits.len(), 1);
1490 assert_eq!(finalized_commits[0].commit_ref.index, 1);
1491 assert_eq!(finalized_commits[0].leader.round, 1);
1492
1493 let finalized_commits =
1495 add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, 4, true).await;
1496 assert_eq!(finalized_commits.len(), 2);
1497 assert_eq!(finalized_commits[0].commit_ref.index, 2);
1498 assert_eq!(finalized_commits[0].leader.round, 2);
1499 assert_eq!(finalized_commits[1].commit_ref.index, 3);
1500 assert_eq!(finalized_commits[1].leader.round, 3);
1501
1502 let finalized_commits =
1504 add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, 5, true).await;
1505 assert_eq!(finalized_commits.len(), 3);
1506 assert_eq!(finalized_commits[0].commit_ref.index, 4);
1507 assert_eq!(finalized_commits[0].leader.round, 4);
1508 assert_eq!(finalized_commits[1].commit_ref.index, 5);
1509 assert_eq!(finalized_commits[1].leader.round, 6);
1510 assert_eq!(finalized_commits[2].commit_ref.index, 6);
1511 assert_eq!(finalized_commits[2].leader.round, 7);
1512
1513 assert!(fixture.commit_finalizer.is_empty());
1515 }
1516}