consensus_core/
commit_finalizer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet, VecDeque},
6    sync::Arc,
7};
8
9use consensus_config::Stake;
10use consensus_types::block::{BlockRef, Round, TransactionIndex};
11use mysten_metrics::{
12    monitored_mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
13    monitored_scope, spawn_logged_monitored_task,
14};
15use parking_lot::RwLock;
16use tokio::task::JoinSet;
17
18use crate::{
19    BlockAPI, CommitIndex, CommittedSubDag, VerifiedBlock,
20    commit::DEFAULT_WAVE_LENGTH,
21    context::Context,
22    dag_state::DagState,
23    error::{ConsensusError, ConsensusResult},
24    stake_aggregator::{QuorumThreshold, StakeAggregator},
25    transaction_certifier::TransactionCertifier,
26};
27
28/// For transaction T committed at leader round R, when a new leader at round >= R + INDIRECT_REJECT_DEPTH
29/// commits and T is still not finalized, T is rejected.
30/// NOTE: 3 round is the minimum depth possible for indirect finalization and rejection.
31pub(crate) const INDIRECT_REJECT_DEPTH: Round = 3;
32
33/// Handle to CommitFinalizer, for sending CommittedSubDag.
34pub(crate) struct CommitFinalizerHandle {
35    sender: UnboundedSender<CommittedSubDag>,
36}
37
38impl CommitFinalizerHandle {
39    // Sends a CommittedSubDag to CommitFinalizer, which will finalize it before sending it to execution.
40    pub(crate) fn send(&self, commit: CommittedSubDag) -> ConsensusResult<()> {
41        self.sender.send(commit).map_err(|e| {
42            tracing::warn!("Failed to send to commit finalizer, probably due to shutdown: {e:?}");
43            ConsensusError::Shutdown
44        })
45    }
46}
47
48/// CommitFinalizer accepts a continuous stream of CommittedSubDag and outputs
49/// them when they are finalized.
50/// In finalized commits, every transaction is either finalized or rejected.
51/// It runs in a separate thread, to reduce the load on the core thread.
52///
53/// Life of a finalized commit:
54///
55/// For efficiency, finalization happens first for transactions without reject votes (common case).
56/// The pending undecided transactions with reject votes are individually finalized or rejected.
57/// When there is no more pending transactions, the commit is finalized.
58///
59/// This is correct because regardless if a commit leader was directly or indirectly committed,
60/// every committed block can be considered finalized, because at least one leader certificate of the commit
61/// will be committed, which can also serve as a certificate for the block and its transactions.
62///
63/// From the earliest buffered commit, pending blocks are checked to see if they are now finalized.
64/// New finalized blocks are removed from the pending blocks, and its transactions are moved to the
65/// finalized, rejected or pending state. If the commit now has no pending blocks or transactions,
66/// the commit is finalized and popped from the buffer. The next earliest commit is then processed
67/// similarly, until either the buffer becomes empty or a commit with pending blocks or transactions
68/// is encountered.
69pub struct CommitFinalizer {
70    context: Arc<Context>,
71    dag_state: Arc<RwLock<DagState>>,
72    transaction_certifier: TransactionCertifier,
73    commit_sender: UnboundedSender<CommittedSubDag>,
74
75    // Last commit index processed by CommitFinalizer.
76    last_processed_commit: Option<CommitIndex>,
77    // Commits pending finalization.
78    pending_commits: VecDeque<CommitState>,
79    // Blocks in the pending commits.
80    blocks: Arc<RwLock<BTreeMap<BlockRef, RwLock<BlockState>>>>,
81}
82
83impl CommitFinalizer {
84    pub fn new(
85        context: Arc<Context>,
86        dag_state: Arc<RwLock<DagState>>,
87        transaction_certifier: TransactionCertifier,
88        commit_sender: UnboundedSender<CommittedSubDag>,
89    ) -> Self {
90        Self {
91            context,
92            dag_state,
93            transaction_certifier,
94            commit_sender,
95            last_processed_commit: None,
96            pending_commits: VecDeque::new(),
97            blocks: Arc::new(RwLock::new(BTreeMap::new())),
98        }
99    }
100
101    pub(crate) fn start(
102        context: Arc<Context>,
103        dag_state: Arc<RwLock<DagState>>,
104        transaction_certifier: TransactionCertifier,
105        commit_sender: UnboundedSender<CommittedSubDag>,
106    ) -> CommitFinalizerHandle {
107        let processor = Self::new(context, dag_state, transaction_certifier, commit_sender);
108        let (sender, receiver) = unbounded_channel("consensus_commit_finalizer");
109        let _handle =
110            spawn_logged_monitored_task!(processor.run(receiver), "consensus_commit_finalizer");
111        CommitFinalizerHandle { sender }
112    }
113
114    async fn run(mut self, mut receiver: UnboundedReceiver<CommittedSubDag>) {
115        while let Some(committed_sub_dag) = receiver.recv().await {
116            let already_finalized = !self.context.protocol_config.mysticeti_fastpath()
117                || committed_sub_dag.recovered_rejected_transactions;
118            let finalized_commits = if !already_finalized {
119                self.process_commit(committed_sub_dag).await
120            } else {
121                vec![committed_sub_dag]
122            };
123            if !finalized_commits.is_empty() {
124                // Transaction certifier state should be GC'ed as soon as new commits are finalized.
125                // But this is done outside of process_commit(), because during recovery process_commit()
126                // is not called to finalize commits, but GC still needs to run.
127                self.try_update_gc_round(finalized_commits.last().unwrap().leader.round);
128                let mut dag_state = self.dag_state.write();
129                if !already_finalized {
130                    // Records rejected transactions in newly finalized commits.
131                    for commit in &finalized_commits {
132                        dag_state.add_finalized_commit(
133                            commit.commit_ref,
134                            commit.rejected_transactions_by_block.clone(),
135                        );
136                    }
137                }
138                // Commits and committed blocks must be persisted to storage before sending them to Sui
139                // to execute their finalized transactions.
140                // Commit metadata and uncommitted blocks can be persisted more lazily because they are recoverable.
141                // But for simplicity, all unpersisted commits and blocks are flushed to storage.
142                dag_state.flush();
143            }
144            for commit in finalized_commits {
145                if let Err(e) = self.commit_sender.send(commit) {
146                    tracing::warn!(
147                        "Failed to send to commit handler, probably due to shutdown: {e:?}"
148                    );
149                    return;
150                }
151            }
152        }
153    }
154
155    pub async fn process_commit(
156        &mut self,
157        committed_sub_dag: CommittedSubDag,
158    ) -> Vec<CommittedSubDag> {
159        let _scope = monitored_scope("CommitFinalizer::process_commit");
160
161        if let Some(last_processed_commit) = self.last_processed_commit {
162            assert_eq!(
163                last_processed_commit + 1,
164                committed_sub_dag.commit_ref.index
165            );
166        }
167        self.last_processed_commit = Some(committed_sub_dag.commit_ref.index);
168
169        self.pending_commits
170            .push_back(CommitState::new(committed_sub_dag));
171
172        let mut finalized_commits = vec![];
173
174        // The prerequisite for running direct finalization on a commit is that the commit must
175        // have either a quorum of leader certificates in the local DAG, or a committed leader certificate.
176        //
177        // A leader certificate is a finalization certificate for every block in the commit.
178        // When the prerequisite holds, all blocks in the current commit can be considered finalized.
179        // And any transaction in the current commit that has not observed reject votes will never be rejected.
180        // So these transactions are directly finalized.
181        //
182        // When a commit is direct, there are a quorum of its leader certificates in the local DAG.
183        //
184        // When a commit is indirect, it implies one of its leader certificates is in the committed blocks.
185        // So a leader certificate must exist in the local DAG as well.
186        //
187        // When a commit is received through commit sync and processed as certified commit, the commit might
188        // not have a leader certificate in the local DAG. So a committed transaction might not observe any reject
189        // vote from local DAG, although it will eventually get rejected. To finalize blocks in this commit,
190        // there must be another commit with leader round >= 3 (WAVE_LENGTH) rounds above the commit leader.
191        // From the indirect commit rule, a leader certificate must exist in committed blocks for the earliest commit.
192        for i in 0..self.pending_commits.len() {
193            let commit_state = &self.pending_commits[i];
194            if commit_state.pending_blocks.is_empty() {
195                // The commit has already been processed through direct finalization.
196                continue;
197            }
198            // Direct finalization cannot happen when
199            // -  This commit is remote.
200            // -  And the latest commit is less than 3 (WAVE_LENGTH) rounds above this commit.
201            // In this case, this commit's leader certificate is not guaranteed to be in local DAG.
202            if !commit_state.commit.decided_with_local_blocks {
203                let last_commit_state = self.pending_commits.back().unwrap();
204                if commit_state.commit.leader.round + DEFAULT_WAVE_LENGTH
205                    > last_commit_state.commit.leader.round
206                {
207                    break;
208                }
209            }
210            self.try_direct_finalize_commit(i);
211        }
212        let direct_finalized_commits = self.pop_finalized_commits();
213        self.context
214            .metrics
215            .node_metrics
216            .finalizer_output_commits
217            .with_label_values(&["direct"])
218            .inc_by(direct_finalized_commits.len() as u64);
219        finalized_commits.extend(direct_finalized_commits);
220
221        // Indirect finalization: one or more commits cannot be directly finalized.
222        // So the pending transactions need to be checked for indirect finalization.
223        if !self.pending_commits.is_empty() {
224            // Initialize the state of the last added commit for computing indirect finalization.
225            //
226            // As long as there are remaining commits, even if the last commit has been directly finalized,
227            // its state still needs to be initialized here to help indirectly finalize previous commits.
228            // This is because the last commit may have been directly finalized, but its previous commits
229            // may not have been directly finalized.
230            self.link_blocks_in_last_commit();
231            self.append_origin_descendants_from_last_commit();
232            // Try to indirectly finalize a prefix of the buffered commits.
233            // If only one commit remains, it cannot be indirectly finalized because there is no commit afterwards,
234            // so it is excluded.
235            while self.pending_commits.len() > 1 {
236                // Stop indirect finalization when the earliest commit has not been processed
237                // through direct finalization.
238                if !self.pending_commits[0].pending_blocks.is_empty() {
239                    break;
240                }
241                // Otherwise, try to indirectly finalize the earliest commit.
242                self.try_indirect_finalize_first_commit().await;
243                let indirect_finalized_commits = self.pop_finalized_commits();
244                if indirect_finalized_commits.is_empty() {
245                    // No additional commits can be indirectly finalized.
246                    break;
247                }
248                self.context
249                    .metrics
250                    .node_metrics
251                    .finalizer_output_commits
252                    .with_label_values(&["indirect"])
253                    .inc_by(indirect_finalized_commits.len() as u64);
254                finalized_commits.extend(indirect_finalized_commits);
255            }
256        }
257
258        self.context
259            .metrics
260            .node_metrics
261            .finalizer_buffered_commits
262            .set(self.pending_commits.len() as i64);
263
264        finalized_commits
265    }
266
267    // Tries directly finalizing transactions in the commit.
268    fn try_direct_finalize_commit(&mut self, index: usize) {
269        let num_commits = self.pending_commits.len();
270        let commit_state = self
271            .pending_commits
272            .get_mut(index)
273            .unwrap_or_else(|| panic!("Commit {} does not exist. len = {}", index, num_commits,));
274        // Direct commit means every transaction in the commit can be considered to have a quorum of post-commit certificates,
275        // unless the transaction has reject votes that do not reach quorum either.
276        assert!(!commit_state.pending_blocks.is_empty());
277
278        let metrics = &self.context.metrics.node_metrics;
279        let pending_blocks = std::mem::take(&mut commit_state.pending_blocks);
280        for (block_ref, num_transactions) in pending_blocks {
281            let reject_votes = self.transaction_certifier.get_reject_votes(&block_ref)
282                .unwrap_or_else(|| panic!("No vote info found for {block_ref}. It is either incorrectly gc'ed or failed to be recovered after crash."));
283            metrics
284                .finalizer_transaction_status
285                .with_label_values(&["direct_finalize"])
286                .inc_by((num_transactions - reject_votes.len()) as u64);
287            let hostname = &self.context.committee.authority(block_ref.author).hostname;
288            metrics
289                .finalizer_reject_votes
290                .with_label_values(&[hostname])
291                .inc_by(reject_votes.len() as u64);
292            // If a transaction_index does not exist in reject_votes, the transaction has no reject votes.
293            // So it is finalized and does not need to be added to pending_transactions.
294            for (transaction_index, stake) in reject_votes {
295                // If the transaction has > 0 but < 2f+1 reject votes, it is still pending.
296                // Otherwise, it is rejected.
297                let entry = if stake < self.context.committee.quorum_threshold() {
298                    commit_state
299                        .pending_transactions
300                        .entry(block_ref)
301                        .or_default()
302                } else {
303                    metrics
304                        .finalizer_transaction_status
305                        .with_label_values(&["direct_reject"])
306                        .inc();
307                    commit_state
308                        .rejected_transactions
309                        .entry(block_ref)
310                        .or_default()
311                };
312                entry.insert(transaction_index);
313            }
314        }
315    }
316
317    // Creates an entry in the blocks map for each block in the commit,
318    // and have its ancestors link to the block.
319    fn link_blocks_in_last_commit(&mut self) {
320        let commit_state = self
321            .pending_commits
322            .back_mut()
323            .unwrap_or_else(|| panic!("No pending commit."));
324
325        // Link blocks in ascending order of round, to ensure ancestor block states are created
326        // before they are linked from.
327        let mut blocks = commit_state.commit.blocks.clone();
328        blocks.sort_by_key(|b| b.round());
329
330        let mut blocks_map = self.blocks.write();
331        for block in blocks {
332            let block_ref = block.reference();
333            // Link ancestors to the block.
334            for ancestor in block.ancestors() {
335                // Ancestor may not exist in the blocks map if it has been finalized or gc'ed.
336                // So skip linking if the ancestor does not exist.
337                if let Some(ancestor_block) = blocks_map.get(ancestor) {
338                    ancestor_block.write().children.insert(block_ref);
339                }
340            }
341            // Initialize the block state.
342            blocks_map
343                .entry(block_ref)
344                .or_insert_with(|| RwLock::new(BlockState::new(block)));
345        }
346    }
347
348    /// Updates the set of origin descendants, by appending blocks from the last commit to
349    /// origin descendants of previous linked blocks from the same origin.
350    ///
351    /// The purpose of maintaining the origin descendants per block is to save bandwidth by avoiding to explicitly
352    /// list all accept votes on transactions in blocks.
353    /// Instead when an ancestor block Ba is first included by a proposed block Bp, reject votes for transactions in Ba
354    /// are explicitly listed (if they exist). The rest of non-rejected transactions in Ba are assumed to be accepted by Bp.
355    /// This vote compression rule must be applied during vote aggregation as well.
356    ///
357    /// The above rule is equivalent to saying that transactions in a block can only be voted on by its immediate descendants.
358    /// A block Bp is an **immediate descendant** of Ba, if any directed path from Bp to Ba does not contain a block from Bp's own authority.
359    ///
360    /// This rule implies the following optimization is possible: after collecting votes for Ba from block Bp,
361    /// we can skip collecting votes from Bp's **origin descendants** (descendant blocks from the
362    /// same authority), because they cannot vote on Ba anyway.
363    ///
364    /// This vote compression rule is easy to implement when proposing blocks. Reject votes can be gathered against
365    /// all the newly included ancestors of the proposed block. But vote decompression is trickier to get right.
366    /// One edge case is when a block may not be an immediate descendant, because of GC. In this case votes from the
367    /// block should not be counted.
368    fn append_origin_descendants_from_last_commit(&mut self) {
369        let commit_state = self
370            .pending_commits
371            .back_mut()
372            .unwrap_or_else(|| panic!("No pending commit."));
373        let mut committed_blocks = commit_state.commit.blocks.clone();
374        committed_blocks.sort_by_key(|b| b.round());
375        let blocks_map = self.blocks.read();
376        for committed_block in committed_blocks {
377            let committed_block_ref = committed_block.reference();
378            // Each block must have at least one ancestor.
379            // Block verification ensures the first ancestor is from the block's own authority.
380            // Also, block verification ensures each authority appears at most once among ancestors.
381            let mut origin_ancestor_ref = *blocks_map
382                .get(&committed_block_ref)
383                .unwrap()
384                .read()
385                .block
386                .ancestors()
387                .first()
388                .unwrap();
389            while origin_ancestor_ref.author == committed_block_ref.author {
390                let Some(origin_ancestor_block) = blocks_map.get(&origin_ancestor_ref) else {
391                    break;
392                };
393                origin_ancestor_block
394                    .write()
395                    .origin_descendants
396                    .push(committed_block_ref);
397                origin_ancestor_ref = *origin_ancestor_block
398                    .read()
399                    .block
400                    .ancestors()
401                    .first()
402                    .unwrap();
403            }
404        }
405    }
406
407    // Tries indirectly finalizing the buffered commits at the given index.
408    async fn try_indirect_finalize_first_commit(&mut self) {
409        // Ensure direct finalization has been attempted for the commit.
410        assert!(!self.pending_commits.is_empty());
411        assert!(self.pending_commits[0].pending_blocks.is_empty());
412
413        // Optional optimization: re-check pending transactions to see if they are rejected by a quorum now.
414        self.check_pending_transactions_in_first_commit();
415
416        // Check if remaining pending transactions can be finalized.
417        self.try_indirect_finalize_pending_transactions_in_first_commit()
418            .await;
419
420        // Check if remaining pending transactions can be indirectly rejected.
421        self.try_indirect_reject_pending_transactions_in_first_commit();
422    }
423
424    fn check_pending_transactions_in_first_commit(&mut self) {
425        let mut all_rejected_transactions: Vec<(BlockRef, Vec<TransactionIndex>)> = vec![];
426
427        // Collect all rejected transactions without modifying state
428        for (block_ref, pending_transactions) in &self.pending_commits[0].pending_transactions {
429            let reject_votes: BTreeMap<TransactionIndex, Stake> = self
430                .transaction_certifier
431                .get_reject_votes(block_ref)
432                .unwrap_or_else(|| panic!("No vote info found for {block_ref}. It is incorrectly gc'ed or failed to be recovered after crash."))
433                .into_iter()
434                .collect();
435            let mut rejected_transactions = vec![];
436            for &transaction_index in pending_transactions {
437                // Pending transactions should always have reject votes.
438                let reject_stake = reject_votes.get(&transaction_index).copied().unwrap();
439                if reject_stake < self.context.committee.quorum_threshold() {
440                    // The transaction cannot be rejected yet.
441                    continue;
442                }
443                // Otherwise, mark the transaction for rejection.
444                rejected_transactions.push(transaction_index);
445            }
446            if !rejected_transactions.is_empty() {
447                all_rejected_transactions.push((*block_ref, rejected_transactions));
448            }
449        }
450
451        // Move rejected transactions from pending_transactions.
452        for (block_ref, rejected_transactions) in all_rejected_transactions {
453            self.context
454                .metrics
455                .node_metrics
456                .finalizer_transaction_status
457                .with_label_values(&["direct_late_reject"])
458                .inc_by(rejected_transactions.len() as u64);
459            let curr_commit_state = &mut self.pending_commits[0];
460            curr_commit_state.remove_pending_transactions(&block_ref, &rejected_transactions);
461            curr_commit_state
462                .rejected_transactions
463                .entry(block_ref)
464                .or_default()
465                .extend(rejected_transactions);
466        }
467    }
468
469    async fn try_indirect_finalize_pending_transactions_in_first_commit(&mut self) {
470        let _scope = monitored_scope(
471            "CommitFinalizer::try_indirect_finalize_pending_transactions_in_first_commit",
472        );
473
474        let pending_blocks: Vec<_> = self.pending_commits[0]
475            .pending_transactions
476            .iter()
477            .map(|(k, v)| (*k, v.clone()))
478            .collect();
479
480        // Number of blocks to process in each task.
481        const BLOCKS_PER_INDIRECT_COMMIT_TASK: usize = 8;
482
483        // Process chunks in parallel.
484        let mut all_finalized_transactions = vec![];
485        let mut join_set = JoinSet::new();
486        // TODO(fastpath): investigate using a cost based batching,
487        // for example each block has cost num authorities + pending_transactions.len().
488        for chunk in pending_blocks.chunks(BLOCKS_PER_INDIRECT_COMMIT_TASK) {
489            let context = self.context.clone();
490            let blocks = self.blocks.clone();
491            let chunk: Vec<(BlockRef, BTreeSet<TransactionIndex>)> = chunk.to_vec();
492
493            join_set.spawn(tokio::task::spawn_blocking(move || {
494                let mut chunk_results = Vec::new();
495
496                for (block_ref, pending_transactions) in chunk {
497                    let finalized = Self::try_indirect_finalize_pending_transactions_in_block(
498                        &context,
499                        &blocks,
500                        block_ref,
501                        pending_transactions,
502                    );
503
504                    if !finalized.is_empty() {
505                        chunk_results.push((block_ref, finalized));
506                    }
507                }
508
509                chunk_results
510            }));
511        }
512
513        // Collect results from all chunks
514        while let Some(result) = join_set.join_next().await {
515            let e = match result {
516                Ok(blocking_result) => match blocking_result {
517                    Ok(chunk_results) => {
518                        all_finalized_transactions.extend(chunk_results);
519                        continue;
520                    }
521                    Err(e) => e,
522                },
523                Err(e) => e,
524            };
525            if e.is_panic() {
526                std::panic::resume_unwind(e.into_panic());
527            }
528            tracing::info!("Process likely shutting down: {:?}", e);
529            // Ok to return. No potential inconsistency in state.
530            return;
531        }
532
533        for (block_ref, finalized_transactions) in all_finalized_transactions {
534            self.context
535                .metrics
536                .node_metrics
537                .finalizer_transaction_status
538                .with_label_values(&["indirect_finalize"])
539                .inc_by(finalized_transactions.len() as u64);
540            // Remove finalized transactions from pending transactions.
541            self.pending_commits[0]
542                .remove_pending_transactions(&block_ref, &finalized_transactions);
543        }
544    }
545
546    fn try_indirect_reject_pending_transactions_in_first_commit(&mut self) {
547        let curr_leader_round = self.pending_commits[0].commit.leader.round;
548        let last_commit_leader_round = self.pending_commits.back().unwrap().commit.leader.round;
549        if curr_leader_round + INDIRECT_REJECT_DEPTH <= last_commit_leader_round {
550            let curr_commit_state = &mut self.pending_commits[0];
551            // This function is called after trying to indirectly finalize pending blocks.
552            // When last commit leader round is INDIRECT_REJECT_DEPTH rounds higher or more,
553            // all pending blocks should have been finalized.
554            assert!(curr_commit_state.pending_blocks.is_empty());
555            // This function is called after trying to indirectly finalize pending transactions.
556            // All remaining pending transactions, since they are not finalized, should now be
557            // indirectly rejected.
558            let pending_transactions = std::mem::take(&mut curr_commit_state.pending_transactions);
559            for (block_ref, pending_transactions) in pending_transactions {
560                self.context
561                    .metrics
562                    .node_metrics
563                    .finalizer_transaction_status
564                    .with_label_values(&["indirect_reject"])
565                    .inc_by(pending_transactions.len() as u64);
566                curr_commit_state
567                    .rejected_transactions
568                    .entry(block_ref)
569                    .or_default()
570                    .extend(pending_transactions);
571            }
572        }
573    }
574
575    // Returns the indices of the requested pending transactions that are indirectly finalized.
576    // This function is used for checking finalization of transactions, so it must traverse
577    // all blocks which can contribute to the requested transactions' finalizations.
578    fn try_indirect_finalize_pending_transactions_in_block(
579        context: &Arc<Context>,
580        blocks: &Arc<RwLock<BTreeMap<BlockRef, RwLock<BlockState>>>>,
581        pending_block_ref: BlockRef,
582        pending_transactions: BTreeSet<TransactionIndex>,
583    ) -> Vec<TransactionIndex> {
584        if pending_transactions.is_empty() {
585            return vec![];
586        }
587        let mut accept_votes: BTreeMap<TransactionIndex, StakeAggregator<QuorumThreshold>> =
588            pending_transactions
589                .into_iter()
590                .map(|transaction_index| (transaction_index, StakeAggregator::new()))
591                .collect();
592        let mut finalized_transactions = vec![];
593        let blocks_map = blocks.read();
594        // Use BTreeSet to ensure always visit blocks in the earliest round.
595        let mut to_visit_blocks = blocks_map
596            .get(&pending_block_ref)
597            .unwrap()
598            .read()
599            .children
600            .clone();
601        // Blocks that have been visited.
602        let mut visited = BTreeSet::new();
603        // Blocks where votes and origin descendants should be ignored for processing.
604        let mut ignored = BTreeSet::new();
605        // Traverse children blocks breadth-first and accumulate accept votes for pending transactions.
606        while let Some(curr_block_ref) = to_visit_blocks.pop_first() {
607            if !visited.insert(curr_block_ref) {
608                continue;
609            }
610            let curr_block_state = blocks_map.get(&curr_block_ref).unwrap_or_else(|| panic!("Block {curr_block_ref} is either incorrectly gc'ed or failed to be recovered after crash.")).read();
611            // The first ancestor of current block should have the same origin / author as the current block.
612            // If it is not found in the blocks map but have round higher than the pending block, it might have
613            // voted on the pending block but have been GC'ed.
614            // Because the GC'ed block might have voted on the pending block and rejected some of the pending transactions,
615            // we cannot assume current block is voting to accept transactions from the pending block.
616            let curr_origin_ancestor_ref = curr_block_state.block.ancestors().first().unwrap();
617            let skip_votes = curr_block_ref.author == curr_origin_ancestor_ref.author
618                && pending_block_ref.round < curr_origin_ancestor_ref.round
619                && !blocks_map.contains_key(curr_origin_ancestor_ref);
620            // Skip counting votes from the block if it has been marked to be ignored.
621            if ignored.insert(curr_block_ref) {
622                // Skip collecting votes from origin descendants of current block.
623                // Votes from origin descendants of current block do not count for these transactions.
624                // Consider this case: block B is an origin descendant of block A (from the same authority),
625                // and both blocks A and B link to another block C.
626                // Only B's implicit and explicit transaction votes on C are considered.
627                // None of A's implicit or explicit transaction votes on C should be considered.
628                //
629                // See append_origin_descendants_from_last_commit() for more details.
630                ignored.extend(curr_block_state.origin_descendants.iter());
631                // Skip counting votes from current block if the votes on pending block could have been
632                // casted by an earlier block from the same origin.
633                // Note: if the current block casts reject votes on transactions in the pending block,
634                // it can be assumed that accept votes are also casted to other transactions in the pending block.
635                // But we choose to skip counting the accept votes in this edge case for simplicity.
636                if context.protocol_config.consensus_skip_gced_accept_votes() && skip_votes {
637                    let hostname = &context.committee.authority(curr_block_ref.author).hostname;
638                    context
639                        .metrics
640                        .node_metrics
641                        .finalizer_skipped_voting_blocks
642                        .with_label_values(&[hostname])
643                        .inc();
644                    continue;
645                }
646                // Get reject votes from current block to the pending block.
647                let curr_block_reject_votes = curr_block_state
648                    .reject_votes
649                    .get(&pending_block_ref)
650                    .cloned()
651                    .unwrap_or_default();
652                // Because of lifetime, first collect finalized transactions, and then remove them from accept_votes.
653                let mut newly_finalized = vec![];
654                for (index, stake) in &mut accept_votes {
655                    // Skip if the transaction has been rejected by the current block.
656                    if curr_block_reject_votes.contains(index) {
657                        continue;
658                    }
659                    // Skip if the total stake has not reached quorum.
660                    if !stake.add(curr_block_ref.author, &context.committee) {
661                        continue;
662                    }
663                    newly_finalized.push(*index);
664                    finalized_transactions.push(*index);
665                }
666                // There is no need to aggregate additional votes for already finalized transactions.
667                for index in newly_finalized {
668                    accept_votes.remove(&index);
669                }
670                // End traversal if all blocks and requested transactions have reached quorum.
671                if accept_votes.is_empty() {
672                    break;
673                }
674            }
675            // Add additional children blocks to visit.
676            to_visit_blocks.extend(
677                curr_block_state
678                    .children
679                    .iter()
680                    .filter(|b| !visited.contains(*b)),
681            );
682        }
683        finalized_transactions
684    }
685
686    fn pop_finalized_commits(&mut self) -> Vec<CommittedSubDag> {
687        let mut finalized_commits = vec![];
688
689        while let Some(commit_state) = self.pending_commits.front() {
690            if !commit_state.pending_blocks.is_empty()
691                || !commit_state.pending_transactions.is_empty()
692            {
693                // The commit is not finalized yet.
694                break;
695            }
696
697            // Pop the finalized commit and set its rejected transactions.
698            let commit_state = self.pending_commits.pop_front().unwrap();
699            let mut commit = commit_state.commit;
700            for (block_ref, rejected_transactions) in commit_state.rejected_transactions {
701                commit
702                    .rejected_transactions_by_block
703                    .insert(block_ref, rejected_transactions.into_iter().collect());
704            }
705
706            // Clean up committed blocks.
707            let mut blocks_map = self.blocks.write();
708            for block in commit.blocks.iter() {
709                blocks_map.remove(&block.reference());
710            }
711
712            let round_delay = if let Some(last_commit_state) = self.pending_commits.back() {
713                last_commit_state.commit.leader.round - commit.leader.round
714            } else {
715                0
716            };
717            self.context
718                .metrics
719                .node_metrics
720                .finalizer_round_delay
721                .observe(round_delay as f64);
722
723            finalized_commits.push(commit);
724        }
725
726        finalized_commits
727    }
728
729    fn try_update_gc_round(&mut self, last_finalized_commit_round: Round) {
730        // GC TransactionCertifier state only with finalized commits, to ensure unfinalized transactions
731        // can access their reject votes from TransactionCertifier.
732        let gc_round = self
733            .dag_state
734            .read()
735            .calculate_gc_round(last_finalized_commit_round);
736        self.transaction_certifier.run_gc(gc_round);
737    }
738
739    #[cfg(test)]
740    fn is_empty(&self) -> bool {
741        self.pending_commits.is_empty() && self.blocks.read().is_empty()
742    }
743}
744
745struct CommitState {
746    commit: CommittedSubDag,
747    // Blocks pending finalization, mapped to the number of transactions in the block.
748    // This field is populated by all blocks in the commit, before direct finalization.
749    // After direct finalization, this field becomes empty.
750    pending_blocks: BTreeMap<BlockRef, usize>,
751    // Transactions pending indirect finalization.
752    // This field is populated after direct finalization, if pending transactions exist.
753    // Values in this field are removed as transactions are indirectly finalized or directly rejected.
754    // When both pending_blocks and pending_transactions are empty, the commit is finalized.
755    pending_transactions: BTreeMap<BlockRef, BTreeSet<TransactionIndex>>,
756    // Transactions rejected by a quorum or indirectly, per block.
757    rejected_transactions: BTreeMap<BlockRef, BTreeSet<TransactionIndex>>,
758}
759
760impl CommitState {
761    fn new(commit: CommittedSubDag) -> Self {
762        let pending_blocks: BTreeMap<_, _> = commit
763            .blocks
764            .iter()
765            .map(|b| (b.reference(), b.transactions().len()))
766            .collect();
767        assert!(!pending_blocks.is_empty());
768        Self {
769            commit,
770            pending_blocks,
771            pending_transactions: BTreeMap::new(),
772            rejected_transactions: BTreeMap::new(),
773        }
774    }
775
776    fn remove_pending_transactions(
777        &mut self,
778        block_ref: &BlockRef,
779        transactions: &[TransactionIndex],
780    ) {
781        let Some(block_pending_txns) = self.pending_transactions.get_mut(block_ref) else {
782            return;
783        };
784        for t in transactions {
785            block_pending_txns.remove(t);
786        }
787        if block_pending_txns.is_empty() {
788            self.pending_transactions.remove(block_ref);
789        }
790    }
791}
792
793struct BlockState {
794    // Content of the block.
795    block: VerifiedBlock,
796    // Blocks which has an explicit ancestor linking to this block.
797    children: BTreeSet<BlockRef>,
798    // Reject votes casted by this block, and by linked ancestors from the same authority.
799    reject_votes: BTreeMap<BlockRef, BTreeSet<TransactionIndex>>,
800    // Other committed blocks that are origin descendants of this block.
801    // See the comment above append_origin_descendants_from_last_commit() for more details.
802    origin_descendants: Vec<BlockRef>,
803}
804
805impl BlockState {
806    fn new(block: VerifiedBlock) -> Self {
807        let reject_votes: BTreeMap<_, _> = block
808            .transaction_votes()
809            .iter()
810            .map(|v| (v.block_ref, v.rejects.clone().into_iter().collect()))
811            .collect();
812        // With at most 4 pending commits and assume 2 origin descendants per commit,
813        // there will be at most 8 origin descendants.
814        let origin_descendants = Vec::with_capacity(8);
815        Self {
816            block,
817            children: BTreeSet::new(),
818            reject_votes,
819            origin_descendants,
820        }
821    }
822}
823
824#[cfg(test)]
825mod tests {
826    use mysten_metrics::monitored_mpsc;
827    use parking_lot::RwLock;
828
829    use crate::{
830        TestBlock, VerifiedBlock, block::BlockTransactionVotes, block_verifier::NoopBlockVerifier,
831        dag_state::DagState, linearizer::Linearizer, storage::mem_store::MemStore,
832        test_dag_builder::DagBuilder,
833    };
834
835    use super::*;
836
837    struct Fixture {
838        context: Arc<Context>,
839        dag_state: Arc<RwLock<DagState>>,
840        transaction_certifier: TransactionCertifier,
841        linearizer: Linearizer,
842        commit_finalizer: CommitFinalizer,
843    }
844
845    impl Fixture {
846        fn add_blocks(&self, blocks: Vec<VerifiedBlock>) {
847            self.transaction_certifier
848                .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
849            self.dag_state.write().accept_blocks(blocks);
850        }
851    }
852
853    fn create_commit_finalizer_fixture() -> Fixture {
854        let (mut context, _keys) = Context::new_for_test(4);
855        context
856            .protocol_config
857            .set_consensus_gc_depth_for_testing(5);
858        context
859            .protocol_config
860            .set_consensus_skip_gced_accept_votes_for_testing(true);
861        let context = Arc::new(context);
862        let dag_state = Arc::new(RwLock::new(DagState::new(
863            context.clone(),
864            Arc::new(MemStore::new()),
865        )));
866        let linearizer = Linearizer::new(context.clone(), dag_state.clone());
867        let (blocks_sender, _blocks_receiver) =
868            monitored_mpsc::unbounded_channel("consensus_block_output");
869        let transaction_certifier = TransactionCertifier::new(
870            context.clone(),
871            Arc::new(NoopBlockVerifier {}),
872            dag_state.clone(),
873            blocks_sender,
874        );
875        let (commit_sender, _commit_receiver) = unbounded_channel("consensus_commit_output");
876        let commit_finalizer = CommitFinalizer::new(
877            context.clone(),
878            dag_state.clone(),
879            transaction_certifier.clone(),
880            commit_sender,
881        );
882        Fixture {
883            context,
884            dag_state,
885            transaction_certifier,
886            linearizer,
887            commit_finalizer,
888        }
889    }
890
891    fn create_block(
892        round: Round,
893        authority: u32,
894        mut ancestors: Vec<BlockRef>,
895        num_transactions: usize,
896        reject_votes: Vec<BlockTransactionVotes>,
897    ) -> VerifiedBlock {
898        // Move own authority ancestor to the front of the ancestors.
899        let i = ancestors
900            .iter()
901            .position(|b| b.author.value() == authority as usize)
902            .unwrap_or_else(|| {
903                panic!("Authority {authority} (round {round}) not found in {ancestors:?}")
904            });
905        let b = ancestors.remove(i);
906        ancestors.insert(0, b);
907        // Create test block.
908        let block = TestBlock::new(round, authority)
909            .set_ancestors(ancestors)
910            .set_transactions(vec![crate::Transaction::new(vec![1; 16]); num_transactions])
911            .set_transaction_votes(reject_votes)
912            .build();
913        VerifiedBlock::new_for_test(block)
914    }
915
916    #[tokio::test]
917    async fn test_direct_finalize_no_reject_votes() {
918        let mut fixture = create_commit_finalizer_fixture();
919
920        // Create round 1-4 blocks with 10 transactions each. Add these blocks to transaction certifier.
921        let mut dag_builder = DagBuilder::new(fixture.context.clone());
922        dag_builder
923            .layers(1..=4)
924            .num_transactions(10)
925            .build()
926            .persist_layers(fixture.dag_state.clone());
927        let blocks = dag_builder.all_blocks();
928        fixture
929            .transaction_certifier
930            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
931
932        // Select a round 2 block as the leader and create CommittedSubDag.
933        let leader = blocks.iter().find(|b| b.round() == 2).unwrap();
934        let committed_sub_dags = fixture.linearizer.handle_commit(vec![leader.clone()]);
935        assert_eq!(committed_sub_dags.len(), 1);
936        let committed_sub_dag = &committed_sub_dags[0];
937
938        // This committed sub-dag can be directly finalized.
939        let finalized_commits = fixture
940            .commit_finalizer
941            .process_commit(committed_sub_dag.clone())
942            .await;
943        assert_eq!(finalized_commits.len(), 1);
944        let finalized_commit = &finalized_commits[0];
945        assert_eq!(committed_sub_dag, finalized_commit);
946
947        // CommitFinalizer should be empty.
948        assert!(fixture.commit_finalizer.is_empty());
949    }
950
951    // Commits can be directly finalized if when they are added to commit finalizer,
952    // the rejected votes reach quorum if they exist on any transaction.
953    #[tokio::test]
954    async fn test_direct_finalize_with_reject_votes() {
955        let mut fixture = create_commit_finalizer_fixture();
956
957        // Create round 1 blocks with 10 transactions each.
958        let mut dag_builder = DagBuilder::new(fixture.context.clone());
959        dag_builder
960            .layer(1)
961            .num_transactions(10)
962            .build()
963            .persist_layers(fixture.dag_state.clone());
964        let round_1_blocks = dag_builder.all_blocks();
965        fixture.transaction_certifier.add_voted_blocks(
966            round_1_blocks
967                .iter()
968                .map(|b| {
969                    if b.author().value() != 3 {
970                        (b.clone(), vec![])
971                    } else {
972                        (b.clone(), vec![0, 3])
973                    }
974                })
975                .collect(),
976        );
977
978        // Select the block with rejected transaction.
979        let block_with_rejected_txn = round_1_blocks[3].clone();
980        let reject_vote = BlockTransactionVotes {
981            block_ref: block_with_rejected_txn.reference(),
982            rejects: vec![0, 3],
983        };
984
985        // Create round 2 blocks without authority 3's block from round 1.
986        let ancestors: Vec<BlockRef> = round_1_blocks[0..3].iter().map(|b| b.reference()).collect();
987        // Leader links to block_with_rejected_txn, but other blocks do not.
988        let round_2_blocks = vec![
989            create_block(
990                2,
991                0,
992                round_1_blocks.iter().map(|b| b.reference()).collect(),
993                10,
994                vec![reject_vote.clone()],
995            ),
996            create_block(2, 1, ancestors.clone(), 10, vec![]),
997            create_block(2, 2, ancestors.clone(), 10, vec![]),
998        ];
999        fixture.add_blocks(round_2_blocks.clone());
1000
1001        // Select round 2 authority 0 block as the leader and create CommittedSubDag.
1002        let leader = round_2_blocks[0].clone();
1003        let committed_sub_dags = fixture.linearizer.handle_commit(vec![leader.clone()]);
1004        assert_eq!(committed_sub_dags.len(), 1);
1005        let committed_sub_dag = &committed_sub_dags[0];
1006        assert_eq!(committed_sub_dag.blocks.len(), 5);
1007
1008        // Create round 3 blocks voting on the leader.
1009        let ancestors: Vec<BlockRef> = round_2_blocks.iter().map(|b| b.reference()).collect();
1010        let round_3_blocks = vec![
1011            create_block(3, 0, ancestors.clone(), 0, vec![]),
1012            create_block(3, 1, ancestors.clone(), 0, vec![reject_vote.clone()]),
1013            create_block(3, 2, ancestors.clone(), 0, vec![reject_vote.clone()]),
1014            create_block(
1015                3,
1016                3,
1017                std::iter::once(round_1_blocks[3].reference())
1018                    .chain(ancestors.clone())
1019                    .collect(),
1020                0,
1021                vec![reject_vote.clone()],
1022            ),
1023        ];
1024        fixture.add_blocks(round_3_blocks.clone());
1025
1026        // Create round 4 blocks certifying the leader.
1027        let ancestors: Vec<BlockRef> = round_3_blocks.iter().map(|b| b.reference()).collect();
1028        let round_4_blocks = vec![
1029            create_block(4, 0, ancestors.clone(), 0, vec![]),
1030            create_block(4, 1, ancestors.clone(), 0, vec![]),
1031            create_block(4, 2, ancestors.clone(), 0, vec![]),
1032            create_block(4, 3, ancestors.clone(), 0, vec![]),
1033        ];
1034        fixture.add_blocks(round_4_blocks.clone());
1035
1036        // This committed sub-dag can be directly finalized because the rejected transactions
1037        // have a quorum of votes.
1038        let finalized_commits = fixture
1039            .commit_finalizer
1040            .process_commit(committed_sub_dag.clone())
1041            .await;
1042        assert_eq!(finalized_commits.len(), 1);
1043        let finalized_commit = &finalized_commits[0];
1044        assert_eq!(committed_sub_dag.commit_ref, finalized_commit.commit_ref);
1045        assert_eq!(committed_sub_dag.blocks, finalized_commit.blocks);
1046        assert_eq!(finalized_commit.rejected_transactions_by_block.len(), 1);
1047        assert_eq!(
1048            finalized_commit
1049                .rejected_transactions_by_block
1050                .get(&block_with_rejected_txn.reference())
1051                .unwrap()
1052                .clone(),
1053            vec![0, 3],
1054        );
1055
1056        // CommitFinalizer should be empty.
1057        assert!(fixture.commit_finalizer.is_empty());
1058    }
1059
1060    // Test indirect finalization when:
1061    // 1. Reject votes on transaction does not reach quorum initially, but reach quorum later.
1062    // 2. Transaction is indirectly rejected.
1063    // 3. Transaction is indirectly finalized.
1064    #[tokio::test]
1065    async fn test_indirect_finalize_with_reject_votes() {
1066        let mut fixture = create_commit_finalizer_fixture();
1067
1068        // Create round 1 blocks with 10 transactions each.
1069        let mut dag_builder = DagBuilder::new(fixture.context.clone());
1070        dag_builder
1071            .layer(1)
1072            .num_transactions(10)
1073            .build()
1074            .persist_layers(fixture.dag_state.clone());
1075        let round_1_blocks = dag_builder.all_blocks();
1076        fixture.transaction_certifier.add_voted_blocks(
1077            round_1_blocks
1078                .iter()
1079                .map(|b| {
1080                    if b.author().value() != 3 {
1081                        (b.clone(), vec![])
1082                    } else {
1083                        (b.clone(), vec![0, 3])
1084                    }
1085                })
1086                .collect(),
1087        );
1088
1089        // Select the block with rejected transaction.
1090        let block_with_rejected_txn = round_1_blocks[3].clone();
1091        // How transactions in this block will be voted:
1092        // Txn 1 (quorum reject): 1 reject vote at round 2, 1 reject vote at round 3, and 1 at round 4.
1093        // Txn 4 (indirect reject): 1 reject vote at round 3, and 1 at round 4.
1094        // Txn 7 (indirect finalize): 1 reject vote at round 3.
1095
1096        // Create round 2 blocks without authority 3.
1097        let ancestors: Vec<BlockRef> = round_1_blocks[0..3].iter().map(|b| b.reference()).collect();
1098        // Leader links to block_with_rejected_txn, but other blocks do not.
1099        let round_2_blocks = vec![
1100            create_block(
1101                2,
1102                0,
1103                round_1_blocks.iter().map(|b| b.reference()).collect(),
1104                10,
1105                vec![BlockTransactionVotes {
1106                    block_ref: block_with_rejected_txn.reference(),
1107                    rejects: vec![1, 4],
1108                }],
1109            ),
1110            // Use ancestors without authority 3 to avoid voting on its transactions.
1111            create_block(2, 1, ancestors.clone(), 10, vec![]),
1112            create_block(2, 2, ancestors.clone(), 10, vec![]),
1113        ];
1114        fixture.add_blocks(round_2_blocks.clone());
1115
1116        // Select round 2 authority 0 block as the a leader.
1117        let mut leaders = vec![round_2_blocks[0].clone()];
1118
1119        // Create round 3 blocks voting on the leader and casting reject votes.
1120        let ancestors: Vec<BlockRef> = round_2_blocks.iter().map(|b| b.reference()).collect();
1121        let round_3_blocks = vec![
1122            create_block(3, 0, ancestors.clone(), 0, vec![]),
1123            create_block(
1124                3,
1125                1,
1126                ancestors.clone(),
1127                0,
1128                vec![BlockTransactionVotes {
1129                    block_ref: block_with_rejected_txn.reference(),
1130                    rejects: vec![1, 4, 7],
1131                }],
1132            ),
1133            create_block(
1134                3,
1135                3,
1136                std::iter::once(round_1_blocks[3].reference())
1137                    .chain(ancestors.clone())
1138                    .collect(),
1139                0,
1140                vec![],
1141            ),
1142        ];
1143        fixture.add_blocks(round_3_blocks.clone());
1144        leaders.push(round_3_blocks[2].clone());
1145
1146        // Create round 4 blocks certifying the leader and casting reject votes.
1147        let ancestors: Vec<BlockRef> = round_3_blocks.iter().map(|b| b.reference()).collect();
1148        let round_4_blocks = vec![
1149            create_block(4, 0, ancestors.clone(), 0, vec![]),
1150            create_block(4, 1, ancestors.clone(), 0, vec![]),
1151            create_block(
1152                4,
1153                2,
1154                std::iter::once(round_2_blocks[2].reference())
1155                    .chain(ancestors.clone())
1156                    .collect(),
1157                0,
1158                vec![BlockTransactionVotes {
1159                    block_ref: block_with_rejected_txn.reference(),
1160                    rejects: vec![1],
1161                }],
1162            ),
1163            create_block(4, 3, ancestors.clone(), 0, vec![]),
1164        ];
1165        fixture.add_blocks(round_4_blocks.clone());
1166        leaders.push(round_4_blocks[1].clone());
1167
1168        // Create round 5-7 blocks without casting reject votes.
1169        // Select the last leader from round 5. It is necessary to have round 5 leader to indirectly finalize
1170        // transactions committed by round 2 leader.
1171        let mut last_round_blocks = round_4_blocks.clone();
1172        for r in 5..=7 {
1173            let ancestors: Vec<BlockRef> =
1174                last_round_blocks.iter().map(|b| b.reference()).collect();
1175            let round_blocks: Vec<_> = (0..4)
1176                .map(|i| create_block(r, i, ancestors.clone(), 0, vec![]))
1177                .collect();
1178            fixture.add_blocks(round_blocks.clone());
1179            if r == 5 {
1180                leaders.push(round_blocks[0].clone());
1181            }
1182            last_round_blocks = round_blocks;
1183        }
1184
1185        // Create CommittedSubDag from leaders.
1186        assert_eq!(leaders.len(), 4);
1187        let committed_sub_dags = fixture.linearizer.handle_commit(leaders);
1188        assert_eq!(committed_sub_dags.len(), 4);
1189
1190        // Buffering the initial 3 commits should not finalize.
1191        for commit in committed_sub_dags.iter().take(3) {
1192            let finalized_commits = fixture
1193                .commit_finalizer
1194                .process_commit(commit.clone())
1195                .await;
1196            assert_eq!(finalized_commits.len(), 0);
1197        }
1198
1199        // Buffering the 4th commit should finalize all commits.
1200        let finalized_commits = fixture
1201            .commit_finalizer
1202            .process_commit(committed_sub_dags[3].clone())
1203            .await;
1204        assert_eq!(finalized_commits.len(), 4);
1205
1206        // Check rejected transactions.
1207        let rejected_transactions = finalized_commits[0].rejected_transactions_by_block.clone();
1208        assert_eq!(rejected_transactions.len(), 1);
1209        assert_eq!(
1210            rejected_transactions
1211                .get(&block_with_rejected_txn.reference())
1212                .unwrap(),
1213            &vec![1, 4]
1214        );
1215
1216        // Other commits should have no rejected transactions.
1217        for commit in finalized_commits.iter().skip(1) {
1218            assert!(commit.rejected_transactions_by_block.is_empty());
1219        }
1220
1221        // CommitFinalizer should be empty.
1222        assert!(fixture.commit_finalizer.is_empty());
1223    }
1224
1225    // Test indirect finalization when transaction is rejected due to GC.
1226    #[tokio::test]
1227    async fn test_indirect_reject_with_gc() {
1228        let mut fixture = create_commit_finalizer_fixture();
1229        assert_eq!(fixture.context.protocol_config.consensus_gc_depth(), 5);
1230
1231        // Create round 1 blocks with 10 transactions each.
1232        let mut dag_builder = DagBuilder::new(fixture.context.clone());
1233        dag_builder
1234            .layer(1)
1235            .num_transactions(10)
1236            .build()
1237            .persist_layers(fixture.dag_state.clone());
1238        let round_1_blocks = dag_builder.all_blocks();
1239        fixture
1240            .transaction_certifier
1241            .add_voted_blocks(round_1_blocks.iter().map(|b| (b.clone(), vec![])).collect());
1242
1243        // Select B1(3) to have a rejected transaction.
1244        let block_with_rejected_txn = round_1_blocks[3].clone();
1245        // How transactions in this block will be voted:
1246        // Txn 1 (GC reject): 1 reject vote at round 2. But the txn will get rejected because there are only
1247        // 2 accept votes.
1248
1249        // Create round 2 blocks, with B2(1) rejecting transaction 1 from B1(3).
1250        // Note that 3 blocks link to B1(3) without rejecting transaction 1.
1251        let ancestors: Vec<BlockRef> = round_1_blocks.iter().map(|b| b.reference()).collect();
1252        let round_2_blocks = vec![
1253            create_block(2, 0, ancestors.clone(), 0, vec![]),
1254            create_block(
1255                2,
1256                1,
1257                ancestors.clone(),
1258                0,
1259                vec![BlockTransactionVotes {
1260                    block_ref: block_with_rejected_txn.reference(),
1261                    rejects: vec![1],
1262                }],
1263            ),
1264            create_block(2, 2, ancestors.clone(), 0, vec![]),
1265            create_block(2, 3, ancestors.clone(), 0, vec![]),
1266        ];
1267        fixture.add_blocks(round_2_blocks.clone());
1268
1269        // Create round 3-6 blocks without creating or linking to an authority 2 block.
1270        // The goal is to GC B2(2).
1271        let mut last_round_blocks: Vec<VerifiedBlock> = round_2_blocks
1272            .iter()
1273            .enumerate()
1274            .filter_map(|(i, b)| if i != 2 { Some(b.clone()) } else { None })
1275            .collect();
1276        for r in 3..=6 {
1277            let ancestors: Vec<BlockRef> =
1278                last_round_blocks.iter().map(|b| b.reference()).collect();
1279            last_round_blocks = [0, 1, 3]
1280                .map(|i| create_block(r, i, ancestors.clone(), 0, vec![]))
1281                .to_vec();
1282            fixture.add_blocks(last_round_blocks.clone());
1283        }
1284
1285        // Create round 7-10 blocks and add a leader from authority 0 of each round.
1286        let mut leaders = vec![];
1287        for r in 7..=10 {
1288            let mut ancestors: Vec<BlockRef> =
1289                last_round_blocks.iter().map(|b| b.reference()).collect();
1290            last_round_blocks = (0..4)
1291                .map(|i| {
1292                    if r == 7 && i == 2 {
1293                        // Link to the GC'ed block B2(2).
1294                        ancestors.push(round_2_blocks[2].reference());
1295                    }
1296                    create_block(r, i, ancestors.clone(), 0, vec![])
1297                })
1298                .collect();
1299            leaders.push(last_round_blocks[0].clone());
1300            fixture.add_blocks(last_round_blocks.clone());
1301        }
1302
1303        // Create CommittedSubDag from leaders.
1304        assert_eq!(leaders.len(), 4);
1305        let committed_sub_dags = fixture.linearizer.handle_commit(leaders);
1306        assert_eq!(committed_sub_dags.len(), 4);
1307
1308        // Ensure 1 reject vote is contained in B2(1) in commit 0.
1309        assert!(committed_sub_dags[0].blocks.contains(&round_2_blocks[1]));
1310        // Ensure B2(2) is GC'ed.
1311        for commit in committed_sub_dags.iter() {
1312            assert!(!commit.blocks.contains(&round_2_blocks[2]));
1313        }
1314
1315        // Buffering the initial 3 commits should not finalize.
1316        for commit in committed_sub_dags.iter().take(3) {
1317            assert!(commit.decided_with_local_blocks);
1318            let finalized_commits = fixture
1319                .commit_finalizer
1320                .process_commit(commit.clone())
1321                .await;
1322            assert_eq!(finalized_commits.len(), 0);
1323        }
1324
1325        // Buffering the 4th commit should finalize all commits.
1326        let finalized_commits = fixture
1327            .commit_finalizer
1328            .process_commit(committed_sub_dags[3].clone())
1329            .await;
1330        assert_eq!(finalized_commits.len(), 4);
1331
1332        // Check rejected transactions.
1333        // B1(3) txn 1 gets rejected, even though there are has 3 blocks links to B1(3) without rejecting txn 1.
1334        // This is because there are only 2 accept votes for this transaction, which is less than the quorum threshold.
1335        let rejected_transactions = finalized_commits[0].rejected_transactions_by_block.clone();
1336        assert_eq!(rejected_transactions.len(), 1);
1337        assert_eq!(
1338            rejected_transactions
1339                .get(&block_with_rejected_txn.reference())
1340                .unwrap(),
1341            &vec![1]
1342        );
1343
1344        // Other commits should have no rejected transactions.
1345        for commit in finalized_commits.iter().skip(1) {
1346            assert!(commit.rejected_transactions_by_block.is_empty());
1347        }
1348
1349        // CommitFinalizer should be empty.
1350        assert!(fixture.commit_finalizer.is_empty());
1351    }
1352
1353    #[tokio::test]
1354    async fn test_finalize_remote_commits_with_reject_votes() {
1355        let mut fixture: Fixture = create_commit_finalizer_fixture();
1356        let mut all_blocks = vec![];
1357
1358        // Create round 1 blocks with 10 transactions each.
1359        let mut dag_builder = DagBuilder::new(fixture.context.clone());
1360        dag_builder.layer(1).num_transactions(10).build();
1361        let round_1_blocks = dag_builder.all_blocks();
1362        all_blocks.push(round_1_blocks.clone());
1363
1364        // Collect leaders from round 1.
1365        let mut leaders = vec![round_1_blocks[0].clone()];
1366
1367        // Create round 2-9 blocks and set leaders until round 7.
1368        let mut last_round_blocks = round_1_blocks.clone();
1369        for r in 2..=9 {
1370            let ancestors: Vec<BlockRef> =
1371                last_round_blocks.iter().map(|b| b.reference()).collect();
1372            let round_blocks: Vec<_> = (0..4)
1373                .map(|i| create_block(r, i, ancestors.clone(), 0, vec![]))
1374                .collect();
1375            all_blocks.push(round_blocks.clone());
1376            if r <= 7 && r != 5 {
1377                leaders.push(round_blocks[r as usize % 4].clone());
1378            }
1379            last_round_blocks = round_blocks;
1380        }
1381
1382        // Leader rounds: 1, 2, 3, 4, 6, 7.
1383        assert_eq!(leaders.len(), 6);
1384
1385        async fn add_blocks_and_process_commit(
1386            fixture: &mut Fixture,
1387            leaders: &[VerifiedBlock],
1388            all_blocks: &[Vec<VerifiedBlock>],
1389            index: usize,
1390            local: bool,
1391        ) -> Vec<CommittedSubDag> {
1392            let leader = leaders[index].clone();
1393            // Add blocks related to the commit to DagState and TransactionCertifier.
1394            if local {
1395                for round_blocks in all_blocks.iter().take(leader.round() as usize + 2) {
1396                    fixture.add_blocks(round_blocks.clone());
1397                }
1398            } else {
1399                for round_blocks in all_blocks.iter().take(leader.round() as usize) {
1400                    fixture.add_blocks(round_blocks.clone());
1401                }
1402            };
1403            // Generate remote commit from leader.
1404            let mut committed_sub_dags = fixture.linearizer.handle_commit(vec![leader]);
1405            assert_eq!(committed_sub_dags.len(), 1);
1406            let mut remote_commit = committed_sub_dags.pop().unwrap();
1407            remote_commit.decided_with_local_blocks = local;
1408            // Process the remote commit.
1409            fixture
1410                .commit_finalizer
1411                .process_commit(remote_commit.clone())
1412                .await
1413        }
1414
1415        // Add commit 1-3 as remote commits. There should be no finalized commits.
1416        for i in 0..3 {
1417            let finalized_commits =
1418                add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, i, false).await;
1419            assert!(finalized_commits.is_empty());
1420        }
1421
1422        // Buffer round 4 commit as a remote commit. This should finalize the 1st commit at round 1.
1423        let finalized_commits =
1424            add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, 3, false).await;
1425        assert_eq!(finalized_commits.len(), 1);
1426        assert_eq!(finalized_commits[0].commit_ref.index, 1);
1427        assert_eq!(finalized_commits[0].leader.round, 1);
1428
1429        // Buffer round 6 (5th) commit as local commit. This should help finalize the commits at round 2 and 3.
1430        let finalized_commits =
1431            add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, 4, true).await;
1432        assert_eq!(finalized_commits.len(), 2);
1433        assert_eq!(finalized_commits[0].commit_ref.index, 2);
1434        assert_eq!(finalized_commits[0].leader.round, 2);
1435        assert_eq!(finalized_commits[1].commit_ref.index, 3);
1436        assert_eq!(finalized_commits[1].leader.round, 3);
1437
1438        // Buffer round 7 (6th) commit as local commit. This should help finalize the commits at round 4, 6 and 7 (itself).
1439        let finalized_commits =
1440            add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, 5, true).await;
1441        assert_eq!(finalized_commits.len(), 3);
1442        assert_eq!(finalized_commits[0].commit_ref.index, 4);
1443        assert_eq!(finalized_commits[0].leader.round, 4);
1444        assert_eq!(finalized_commits[1].commit_ref.index, 5);
1445        assert_eq!(finalized_commits[1].leader.round, 6);
1446        assert_eq!(finalized_commits[2].commit_ref.index, 6);
1447        assert_eq!(finalized_commits[2].leader.round, 7);
1448
1449        // CommitFinalizer should be empty.
1450        assert!(fixture.commit_finalizer.is_empty());
1451    }
1452}