consensus_core/
commit_finalizer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet, VecDeque},
6    sync::Arc,
7};
8
9use consensus_config::Stake;
10use consensus_types::block::{BlockRef, Round, TransactionIndex};
11use mysten_metrics::{
12    monitored_mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
13    monitored_scope, spawn_logged_monitored_task,
14};
15use parking_lot::RwLock;
16use tokio::task::JoinSet;
17
18use crate::{
19    BlockAPI, CommitIndex, CommittedSubDag, VerifiedBlock,
20    commit::DEFAULT_WAVE_LENGTH,
21    context::Context,
22    dag_state::DagState,
23    error::{ConsensusError, ConsensusResult},
24    stake_aggregator::{QuorumThreshold, StakeAggregator},
25    transaction_certifier::TransactionCertifier,
26};
27
28/// For transaction T committed at leader round R, when a new leader at round >= R + INDIRECT_REJECT_DEPTH
29/// commits and T is still not finalized, T is rejected.
30/// NOTE: 3 round is the minimum depth possible for indirect finalization and rejection.
31pub(crate) const INDIRECT_REJECT_DEPTH: Round = 3;
32
33/// Handle to CommitFinalizer, for sending CommittedSubDag.
34pub(crate) struct CommitFinalizerHandle {
35    sender: UnboundedSender<CommittedSubDag>,
36}
37
38impl CommitFinalizerHandle {
39    // Sends a CommittedSubDag to CommitFinalizer, which will finalize it before sending it to execution.
40    pub(crate) fn send(&self, commit: CommittedSubDag) -> ConsensusResult<()> {
41        self.sender.send(commit).map_err(|e| {
42            tracing::warn!("Failed to send to commit finalizer, probably due to shutdown: {e:?}");
43            ConsensusError::Shutdown
44        })
45    }
46}
47
48/// CommitFinalizer accepts a continuous stream of CommittedSubDag and outputs
49/// them when they are finalized.
50/// In finalized commits, every transaction is either finalized or rejected.
51/// It runs in a separate thread, to reduce the load on the core thread.
52///
53/// Life of a finalized commit:
54///
55/// For efficiency, finalization happens first for transactions without reject votes (common case).
56/// The pending undecided transactions with reject votes are individually finalized or rejected.
57/// When there is no more pending transactions, the commit is finalized.
58///
59/// This is correct because regardless if a commit leader was directly or indirectly committed,
60/// every committed block can be considered finalized, because at least one leader certificate of the commit
61/// will be committed, which can also serve as a certificate for the block and its transactions.
62///
63/// From the earliest buffered commit, pending blocks are checked to see if they are now finalized.
64/// New finalized blocks are removed from the pending blocks, and its transactions are moved to the
65/// finalized, rejected or pending state. If the commit now has no pending blocks or transactions,
66/// the commit is finalized and popped from the buffer. The next earliest commit is then processed
67/// similarly, until either the buffer becomes empty or a commit with pending blocks or transactions
68/// is encountered.
69pub struct CommitFinalizer {
70    context: Arc<Context>,
71    dag_state: Arc<RwLock<DagState>>,
72    transaction_certifier: TransactionCertifier,
73    commit_sender: UnboundedSender<CommittedSubDag>,
74
75    // Last commit index processed by CommitFinalizer.
76    last_processed_commit: Option<CommitIndex>,
77    // Commits pending finalization.
78    pending_commits: VecDeque<CommitState>,
79    // Blocks in the pending commits.
80    blocks: Arc<RwLock<BTreeMap<BlockRef, RwLock<BlockState>>>>,
81}
82
83impl CommitFinalizer {
84    pub fn new(
85        context: Arc<Context>,
86        dag_state: Arc<RwLock<DagState>>,
87        transaction_certifier: TransactionCertifier,
88        commit_sender: UnboundedSender<CommittedSubDag>,
89    ) -> Self {
90        Self {
91            context,
92            dag_state,
93            transaction_certifier,
94            commit_sender,
95            last_processed_commit: None,
96            pending_commits: VecDeque::new(),
97            blocks: Arc::new(RwLock::new(BTreeMap::new())),
98        }
99    }
100
101    pub(crate) fn start(
102        context: Arc<Context>,
103        dag_state: Arc<RwLock<DagState>>,
104        transaction_certifier: TransactionCertifier,
105        commit_sender: UnboundedSender<CommittedSubDag>,
106    ) -> CommitFinalizerHandle {
107        let processor = Self::new(context, dag_state, transaction_certifier, commit_sender);
108        let (sender, receiver) = unbounded_channel("consensus_commit_finalizer");
109        let _handle =
110            spawn_logged_monitored_task!(processor.run(receiver), "consensus_commit_finalizer");
111        CommitFinalizerHandle { sender }
112    }
113
114    async fn run(mut self, mut receiver: UnboundedReceiver<CommittedSubDag>) {
115        while let Some(committed_sub_dag) = receiver.recv().await {
116            let already_finalized = !self.context.protocol_config.mysticeti_fastpath()
117                || committed_sub_dag.recovered_rejected_transactions;
118            let finalized_commits = if !already_finalized {
119                self.process_commit(committed_sub_dag).await
120            } else {
121                vec![committed_sub_dag]
122            };
123            if !finalized_commits.is_empty() {
124                // Transaction certifier state should be GC'ed as soon as new commits are finalized.
125                // But this is done outside of process_commit(), because during recovery process_commit()
126                // is not called to finalize commits, but GC still needs to run.
127                self.try_update_gc_round(finalized_commits.last().unwrap().leader.round);
128                let mut dag_state = self.dag_state.write();
129                if !already_finalized {
130                    // Records rejected transactions in newly finalized commits.
131                    for commit in &finalized_commits {
132                        dag_state.add_finalized_commit(
133                            commit.commit_ref,
134                            commit.rejected_transactions_by_block.clone(),
135                        );
136                    }
137                }
138                // Commits and committed blocks must be persisted to storage before sending them to Sui
139                // to execute their finalized transactions.
140                // Commit metadata and uncommitted blocks can be persisted more lazily because they are recoverable.
141                // But for simplicity, all unpersisted commits and blocks are flushed to storage.
142                dag_state.flush();
143            }
144            for commit in finalized_commits {
145                if let Err(e) = self.commit_sender.send(commit) {
146                    tracing::warn!(
147                        "Failed to send to commit handler, probably due to shutdown: {e:?}"
148                    );
149                    return;
150                }
151            }
152        }
153    }
154
155    pub async fn process_commit(
156        &mut self,
157        committed_sub_dag: CommittedSubDag,
158    ) -> Vec<CommittedSubDag> {
159        let _scope = monitored_scope("CommitFinalizer::process_commit");
160
161        if let Some(last_processed_commit) = self.last_processed_commit {
162            assert_eq!(
163                last_processed_commit + 1,
164                committed_sub_dag.commit_ref.index
165            );
166        }
167        self.last_processed_commit = Some(committed_sub_dag.commit_ref.index);
168
169        self.pending_commits
170            .push_back(CommitState::new(committed_sub_dag));
171
172        let mut finalized_commits = vec![];
173
174        // The prerequisite for running direct finalization on a commit is that the commit must
175        // have either a quorum of leader certificates in the local DAG, or a committed leader certificate.
176        //
177        // A leader certificate is a finalization certificate for every block in the commit.
178        // When the prerequisite holds, all blocks in the current commit can be considered finalized.
179        // And any transaction in the current commit that has not observed reject votes will never be rejected.
180        // So these transactions are directly finalized.
181        //
182        // When a commit is direct, there are a quorum of its leader certificates in the local DAG.
183        //
184        // When a commit is indirect, it implies one of its leader certificates is in the committed blocks.
185        // So a leader certificate must exist in the local DAG as well.
186        //
187        // When a commit is received through commit sync and processed as certified commit, the commit might
188        // not have a leader certificate in the local DAG. So a committed transaction might not observe any reject
189        // vote from local DAG, although it will eventually get rejected. To finalize blocks in this commit,
190        // there must be another commit with leader round >= 3 (WAVE_LENGTH) rounds above the commit leader.
191        // From the indirect commit rule, a leader certificate must exist in committed blocks for the earliest commit.
192        for i in 0..self.pending_commits.len() {
193            let commit_state = &self.pending_commits[i];
194            if commit_state.pending_blocks.is_empty() {
195                // The commit has already been processed through direct finalization.
196                continue;
197            }
198            // Direct finalization cannot happen when
199            // -  This commit is remote.
200            // -  And the latest commit is less than 3 (WAVE_LENGTH) rounds above this commit.
201            // In this case, this commit's leader certificate is not guaranteed to be in local DAG.
202            if !commit_state.commit.decided_with_local_blocks {
203                let last_commit_state = self.pending_commits.back().unwrap();
204                if commit_state.commit.leader.round + DEFAULT_WAVE_LENGTH
205                    > last_commit_state.commit.leader.round
206                {
207                    break;
208                }
209            }
210            self.try_direct_finalize_commit(i);
211        }
212        let direct_finalized_commits = self.pop_finalized_commits();
213        self.context
214            .metrics
215            .node_metrics
216            .finalizer_output_commits
217            .with_label_values(&["direct"])
218            .inc_by(direct_finalized_commits.len() as u64);
219        finalized_commits.extend(direct_finalized_commits);
220
221        // Indirect finalization: one or more commits cannot be directly finalized.
222        // So the pending transactions need to be checked for indirect finalization.
223        if !self.pending_commits.is_empty() {
224            // Initialize the state of the last added commit for computing indirect finalization.
225            //
226            // As long as there are remaining commits, even if the last commit has been directly finalized,
227            // its state still needs to be initialized here to help indirectly finalize previous commits.
228            // This is because the last commit may have been directly finalized, but its previous commits
229            // may not have been directly finalized.
230            self.link_blocks_in_last_commit();
231            self.append_origin_descendants_from_last_commit();
232            // Try to indirectly finalize a prefix of the buffered commits.
233            // If only one commit remains, it cannot be indirectly finalized because there is no commit afterwards,
234            // so it is excluded.
235            while self.pending_commits.len() > 1 {
236                // Stop indirect finalization when the earliest commit has not been processed
237                // through direct finalization.
238                if !self.pending_commits[0].pending_blocks.is_empty() {
239                    break;
240                }
241                // Otherwise, try to indirectly finalize the earliest commit.
242                self.try_indirect_finalize_first_commit().await;
243                let indirect_finalized_commits = self.pop_finalized_commits();
244                if indirect_finalized_commits.is_empty() {
245                    // No additional commits can be indirectly finalized.
246                    break;
247                }
248                self.context
249                    .metrics
250                    .node_metrics
251                    .finalizer_output_commits
252                    .with_label_values(&["indirect"])
253                    .inc_by(indirect_finalized_commits.len() as u64);
254                finalized_commits.extend(indirect_finalized_commits);
255            }
256        }
257
258        self.context
259            .metrics
260            .node_metrics
261            .finalizer_buffered_commits
262            .set(self.pending_commits.len() as i64);
263
264        finalized_commits
265    }
266
267    // Tries directly finalizing transactions in the commit.
268    fn try_direct_finalize_commit(&mut self, index: usize) {
269        let num_commits = self.pending_commits.len();
270        let commit_state = self
271            .pending_commits
272            .get_mut(index)
273            .unwrap_or_else(|| panic!("Commit {} does not exist. len = {}", index, num_commits,));
274        // Direct commit means every transaction in the commit can be considered to have a quorum of post-commit certificates,
275        // unless the transaction has reject votes that do not reach quorum either.
276        assert!(!commit_state.pending_blocks.is_empty());
277
278        let metrics = &self.context.metrics.node_metrics;
279        let pending_blocks = std::mem::take(&mut commit_state.pending_blocks);
280        for (block_ref, num_transactions) in pending_blocks {
281            let reject_votes = self.transaction_certifier.get_reject_votes(&block_ref)
282                .unwrap_or_else(|| panic!("No vote info found for {block_ref}. It is either incorrectly gc'ed or failed to be recovered after crash."));
283            metrics
284                .finalizer_transaction_status
285                .with_label_values(&["direct_finalize"])
286                .inc_by((num_transactions - reject_votes.len()) as u64);
287            let hostname = &self.context.committee.authority(block_ref.author).hostname;
288            metrics
289                .finalizer_reject_votes
290                .with_label_values(&[hostname])
291                .inc_by(reject_votes.len() as u64);
292            // If a transaction_index does not exist in reject_votes, the transaction has no reject votes.
293            // So it is finalized and does not need to be added to pending_transactions.
294            for (transaction_index, stake) in reject_votes {
295                // If the transaction has > 0 but < 2f+1 reject votes, it is still pending.
296                // Otherwise, it is rejected.
297                let entry = if stake < self.context.committee.quorum_threshold() {
298                    commit_state
299                        .pending_transactions
300                        .entry(block_ref)
301                        .or_default()
302                } else {
303                    metrics
304                        .finalizer_transaction_status
305                        .with_label_values(&["direct_reject"])
306                        .inc();
307                    commit_state
308                        .rejected_transactions
309                        .entry(block_ref)
310                        .or_default()
311                };
312                entry.insert(transaction_index);
313            }
314        }
315    }
316
317    // Creates an entry in the blocks map for each block in the commit,
318    // and have its ancestors link to the block.
319    fn link_blocks_in_last_commit(&mut self) {
320        let commit_state = self
321            .pending_commits
322            .back_mut()
323            .unwrap_or_else(|| panic!("No pending commit."));
324
325        // Link blocks in ascending order of round, to ensure ancestor block states are created
326        // before they are linked from.
327        let mut blocks = commit_state.commit.blocks.clone();
328        blocks.sort_by_key(|b| b.round());
329
330        let mut blocks_map = self.blocks.write();
331        for block in blocks {
332            let block_ref = block.reference();
333            // Link ancestors to the block.
334            for ancestor in block.ancestors() {
335                // Ancestor may not exist in the blocks map if it has been finalized or gc'ed.
336                // So skip linking if the ancestor does not exist.
337                if let Some(ancestor_block) = blocks_map.get(ancestor) {
338                    ancestor_block.write().children.insert(block_ref);
339                }
340            }
341            // Initialize the block state.
342            blocks_map.entry(block_ref).or_insert_with(|| {
343                RwLock::new(BlockState::new(block, commit_state.commit.commit_ref.index))
344            });
345        }
346    }
347
348    /// Updates the set of origin descendants, by appending blocks from the last commit to
349    /// origin descendants of previous linked blocks from the same origin.
350    ///
351    /// The purpose of maintaining the origin descendants per block is to save bandwidth by avoiding to explicitly
352    /// list all accept votes on transactions in blocks.
353    /// Instead when an ancestor block Ba is first included by a proposed block Bp, reject votes for transactions in Ba
354    /// are explicitly listed (if they exist). The rest of non-rejected transactions in Ba are assumed to be accepted by Bp.
355    /// This vote compression rule must be applied during vote aggregation as well.
356    ///
357    /// The above rule is equivalent to saying that transactions in a block can only be voted on by its immediate descendants.
358    /// A block Bp is an **immediate descendant** of Ba, if any directed path from Bp to Ba does not contain a block from Bp's own authority.
359    ///
360    /// This rule implies the following optimization is possible: after collecting votes for Ba from block Bp,
361    /// we can skip collecting votes from Bp's **origin descendants** (descendant blocks from the
362    /// same authority), because they cannot vote on Ba anyway.
363    ///
364    /// This vote compression rule is easy to implement when proposing blocks. Reject votes can be gathered against
365    /// all the newly included ancestors of the proposed block. But vote decompression is trickier to get right.
366    /// One edge case is when a block may not be an immediate descendant, because of GC. In this case votes from the
367    /// block should not be counted.
368    fn append_origin_descendants_from_last_commit(&mut self) {
369        let commit_state = self
370            .pending_commits
371            .back_mut()
372            .unwrap_or_else(|| panic!("No pending commit."));
373        let mut committed_blocks = commit_state.commit.blocks.clone();
374        committed_blocks.sort_by_key(|b| b.round());
375        let blocks_map = self.blocks.read();
376        for committed_block in committed_blocks {
377            let committed_block_ref = committed_block.reference();
378            // Each block must have at least one ancestor.
379            // Block verification ensures the first ancestor is from the block's own authority.
380            // Also, block verification ensures each authority appears at most once among ancestors.
381            let mut origin_ancestor_ref = *blocks_map
382                .get(&committed_block_ref)
383                .unwrap()
384                .read()
385                .block
386                .ancestors()
387                .first()
388                .unwrap();
389            while origin_ancestor_ref.author == committed_block_ref.author {
390                let Some(origin_ancestor_block) = blocks_map.get(&origin_ancestor_ref) else {
391                    break;
392                };
393                origin_ancestor_block
394                    .write()
395                    .origin_descendants
396                    .push(committed_block_ref);
397                origin_ancestor_ref = *origin_ancestor_block
398                    .read()
399                    .block
400                    .ancestors()
401                    .first()
402                    .unwrap();
403            }
404        }
405    }
406
407    // Tries indirectly finalizing the buffered commits at the given index.
408    async fn try_indirect_finalize_first_commit(&mut self) {
409        // Ensure direct finalization has been attempted for the commit.
410        assert!(!self.pending_commits.is_empty());
411        assert!(self.pending_commits[0].pending_blocks.is_empty());
412
413        // Optional optimization: re-check pending transactions to see if they are rejected by a quorum now.
414        self.check_pending_transactions_in_first_commit();
415
416        // Check if remaining pending transactions can be finalized.
417        self.try_indirect_finalize_pending_transactions_in_first_commit()
418            .await;
419
420        // Check if remaining pending transactions can be indirectly rejected.
421        self.try_indirect_reject_pending_transactions_in_first_commit();
422    }
423
424    fn check_pending_transactions_in_first_commit(&mut self) {
425        let mut all_rejected_transactions: Vec<(BlockRef, Vec<TransactionIndex>)> = vec![];
426
427        // Collect all rejected transactions without modifying state
428        for (block_ref, pending_transactions) in &self.pending_commits[0].pending_transactions {
429            let reject_votes: BTreeMap<TransactionIndex, Stake> = self
430                .transaction_certifier
431                .get_reject_votes(block_ref)
432                .unwrap_or_else(|| panic!("No vote info found for {block_ref}. It is incorrectly gc'ed or failed to be recovered after crash."))
433                .into_iter()
434                .collect();
435            let mut rejected_transactions = vec![];
436            for &transaction_index in pending_transactions {
437                // Pending transactions should always have reject votes.
438                let reject_stake = reject_votes.get(&transaction_index).copied().unwrap();
439                if reject_stake < self.context.committee.quorum_threshold() {
440                    // The transaction cannot be rejected yet.
441                    continue;
442                }
443                // Otherwise, mark the transaction for rejection.
444                rejected_transactions.push(transaction_index);
445            }
446            if !rejected_transactions.is_empty() {
447                all_rejected_transactions.push((*block_ref, rejected_transactions));
448            }
449        }
450
451        // Move rejected transactions from pending_transactions.
452        for (block_ref, rejected_transactions) in all_rejected_transactions {
453            self.context
454                .metrics
455                .node_metrics
456                .finalizer_transaction_status
457                .with_label_values(&["direct_late_reject"])
458                .inc_by(rejected_transactions.len() as u64);
459            let curr_commit_state = &mut self.pending_commits[0];
460            curr_commit_state.remove_pending_transactions(&block_ref, &rejected_transactions);
461            curr_commit_state
462                .rejected_transactions
463                .entry(block_ref)
464                .or_default()
465                .extend(rejected_transactions);
466        }
467    }
468
469    async fn try_indirect_finalize_pending_transactions_in_first_commit(&mut self) {
470        let _scope = monitored_scope(
471            "CommitFinalizer::try_indirect_finalize_pending_transactions_in_first_commit",
472        );
473
474        let pending_blocks: Vec<_> = self.pending_commits[0]
475            .pending_transactions
476            .iter()
477            .map(|(k, v)| (*k, v.clone()))
478            .collect();
479
480        let gc_rounds = self
481            .pending_commits
482            .iter()
483            .map(|c| {
484                (
485                    c.commit.commit_ref.index,
486                    self.dag_state
487                        .read()
488                        .calculate_gc_round(c.commit.leader.round),
489                )
490            })
491            .collect::<Vec<_>>();
492
493        // Number of blocks to process in each task.
494        const BLOCKS_PER_INDIRECT_COMMIT_TASK: usize = 8;
495
496        // Process chunks in parallel.
497        let mut all_finalized_transactions = vec![];
498        let mut join_set = JoinSet::new();
499        // TODO(fastpath): investigate using a cost based batching,
500        // for example each block has cost num authorities + pending_transactions.len().
501        for chunk in pending_blocks.chunks(BLOCKS_PER_INDIRECT_COMMIT_TASK) {
502            let context = self.context.clone();
503            let blocks = self.blocks.clone();
504            let gc_rounds = gc_rounds.clone();
505            let chunk: Vec<(BlockRef, BTreeSet<TransactionIndex>)> = chunk.to_vec();
506
507            join_set.spawn(tokio::task::spawn_blocking(move || {
508                let mut chunk_results = Vec::new();
509
510                for (block_ref, pending_transactions) in chunk {
511                    let finalized = Self::try_indirect_finalize_pending_transactions_in_block(
512                        &context,
513                        &blocks,
514                        &gc_rounds,
515                        block_ref,
516                        pending_transactions,
517                    );
518
519                    if !finalized.is_empty() {
520                        chunk_results.push((block_ref, finalized));
521                    }
522                }
523
524                chunk_results
525            }));
526        }
527
528        // Collect results from all chunks
529        while let Some(result) = join_set.join_next().await {
530            let e = match result {
531                Ok(blocking_result) => match blocking_result {
532                    Ok(chunk_results) => {
533                        all_finalized_transactions.extend(chunk_results);
534                        continue;
535                    }
536                    Err(e) => e,
537                },
538                Err(e) => e,
539            };
540            if e.is_panic() {
541                std::panic::resume_unwind(e.into_panic());
542            }
543            tracing::info!("Process likely shutting down: {:?}", e);
544            // Ok to return. No potential inconsistency in state.
545            return;
546        }
547
548        for (block_ref, finalized_transactions) in all_finalized_transactions {
549            self.context
550                .metrics
551                .node_metrics
552                .finalizer_transaction_status
553                .with_label_values(&["indirect_finalize"])
554                .inc_by(finalized_transactions.len() as u64);
555            // Remove finalized transactions from pending transactions.
556            self.pending_commits[0]
557                .remove_pending_transactions(&block_ref, &finalized_transactions);
558        }
559    }
560
561    fn try_indirect_reject_pending_transactions_in_first_commit(&mut self) {
562        let curr_leader_round = self.pending_commits[0].commit.leader.round;
563        let last_commit_leader_round = self.pending_commits.back().unwrap().commit.leader.round;
564        if curr_leader_round + INDIRECT_REJECT_DEPTH <= last_commit_leader_round {
565            let curr_commit_state = &mut self.pending_commits[0];
566            // This function is called after trying to indirectly finalize pending blocks.
567            // When last commit leader round is INDIRECT_REJECT_DEPTH rounds higher or more,
568            // all pending blocks should have been finalized.
569            assert!(curr_commit_state.pending_blocks.is_empty());
570            // This function is called after trying to indirectly finalize pending transactions.
571            // All remaining pending transactions, since they are not finalized, should now be
572            // indirectly rejected.
573            let pending_transactions = std::mem::take(&mut curr_commit_state.pending_transactions);
574            for (block_ref, pending_transactions) in pending_transactions {
575                self.context
576                    .metrics
577                    .node_metrics
578                    .finalizer_transaction_status
579                    .with_label_values(&["indirect_reject"])
580                    .inc_by(pending_transactions.len() as u64);
581                curr_commit_state
582                    .rejected_transactions
583                    .entry(block_ref)
584                    .or_default()
585                    .extend(pending_transactions);
586            }
587        }
588    }
589
590    // Returns the indices of the requested pending transactions that are indirectly finalized.
591    // This function is used for checking finalization of transactions, so it must traverse
592    // all blocks which can contribute to the requested transactions' finalizations.
593    fn try_indirect_finalize_pending_transactions_in_block(
594        context: &Arc<Context>,
595        blocks: &Arc<RwLock<BTreeMap<BlockRef, RwLock<BlockState>>>>,
596        gc_rounds: &[(CommitIndex, Round)],
597        pending_block_ref: BlockRef,
598        pending_transactions: BTreeSet<TransactionIndex>,
599    ) -> Vec<TransactionIndex> {
600        if pending_transactions.is_empty() {
601            return vec![];
602        }
603        let mut accept_votes: BTreeMap<TransactionIndex, StakeAggregator<QuorumThreshold>> =
604            pending_transactions
605                .into_iter()
606                .map(|transaction_index| (transaction_index, StakeAggregator::new()))
607                .collect();
608        let mut finalized_transactions = vec![];
609        let blocks_map = blocks.read();
610        // Use BTreeSet for to_visit_blocks, to visit blocks in the earliest round first.
611        let (pending_commit_index, mut to_visit_blocks) = {
612            let block_state = blocks_map.get(&pending_block_ref).unwrap().read();
613            (block_state.commit_index, block_state.children.clone())
614        };
615        // Blocks that have been visited.
616        let mut visited = BTreeSet::new();
617        // Blocks where votes and origin descendants should be ignored for processing.
618        let mut ignored = BTreeSet::new();
619        // Traverse children blocks breadth-first and accumulate accept votes for pending transactions.
620        while let Some(curr_block_ref) = to_visit_blocks.pop_first() {
621            if !visited.insert(curr_block_ref) {
622                continue;
623            }
624            let curr_block_state = blocks_map.get(&curr_block_ref).unwrap_or_else(|| panic!("Block {curr_block_ref} is either incorrectly gc'ed or failed to be recovered after crash.")).read();
625            // Check if transaction votes for the pending block are potentially not carried by the
626            // current block, because of GC at the current block's proposer.
627            // See comment above gced_transaction_votes_for_pending_block() for more details.
628            //
629            // Implicit transaction votes should only be considered in commit finalizer if they are definitely
630            // part of the transactions votes from the current block when it is proposed.
631            let votes_gced = Self::gced_transaction_votes_for_pending_block(
632                gc_rounds,
633                pending_block_ref.round,
634                pending_commit_index,
635                curr_block_state.commit_index,
636            );
637            // Skip counting votes from the block if it has been marked to be ignored.
638            if ignored.insert(curr_block_ref) {
639                // Skip collecting votes from origin descendants of current block.
640                // Votes from origin descendants of current block do not count for these transactions.
641                // Consider this case: block B is an origin descendant of block A (from the same authority),
642                // and both blocks A and B link to another block C.
643                // Only B's implicit and explicit transaction votes on C are considered.
644                // None of A's implicit or explicit transaction votes on C should be considered.
645                //
646                // See append_origin_descendants_from_last_commit() for more details.
647                ignored.extend(curr_block_state.origin_descendants.iter());
648                // Skip counting votes from current block if the votes on pending block could have been
649                // casted by an earlier block from the same origin.
650                // Note: if the current block casts reject votes on transactions in the pending block,
651                // it can be assumed that accept votes are also casted to other transactions in the pending block.
652                // But we choose to skip counting the accept votes in this edge case for simplicity.
653                if context.protocol_config.consensus_skip_gced_accept_votes() && votes_gced {
654                    let hostname = &context.committee.authority(curr_block_ref.author).hostname;
655                    context
656                        .metrics
657                        .node_metrics
658                        .finalizer_skipped_voting_blocks
659                        .with_label_values(&[hostname])
660                        .inc();
661                    continue;
662                }
663                // Get reject votes from current block to the pending block.
664                let curr_block_reject_votes = curr_block_state
665                    .reject_votes
666                    .get(&pending_block_ref)
667                    .cloned()
668                    .unwrap_or_default();
669                // Because of lifetime, first collect finalized transactions, and then remove them from accept_votes.
670                let mut newly_finalized = vec![];
671                for (index, stake) in &mut accept_votes {
672                    // Skip if the transaction has been rejected by the current block.
673                    if curr_block_reject_votes.contains(index) {
674                        continue;
675                    }
676                    // Skip if the total stake has not reached quorum.
677                    if !stake.add(curr_block_ref.author, &context.committee) {
678                        continue;
679                    }
680                    newly_finalized.push(*index);
681                    finalized_transactions.push(*index);
682                }
683                // There is no need to aggregate additional votes for already finalized transactions.
684                for index in newly_finalized {
685                    accept_votes.remove(&index);
686                }
687                // End traversal if all blocks and requested transactions have reached quorum.
688                if accept_votes.is_empty() {
689                    break;
690                }
691            }
692            // Add additional children blocks to visit.
693            to_visit_blocks.extend(
694                curr_block_state
695                    .children
696                    .iter()
697                    .filter(|b| !visited.contains(*b)),
698            );
699        }
700        finalized_transactions
701    }
702
703    /// Returns true if transaction votes from the current block to the pending block
704    /// could have been be GC'ed. If this is the case, the current block cannot be assumed
705    /// to have implicitly voted to accept transactions in the pending block.
706    ///
707    /// When collecting transaction votes during proposal of the current block
708    /// (via DagState::link_causal_history()), votes against blocks in the DAG
709    /// below the proposer's GC round are skipped. Implicit accept votes cannot be assumed
710    /// for these GC'ed blocks. However, blocks do not carry the GC round when they are proposed.
711    /// So this function computes the highest possible GC round when proposing the current block,
712    /// and use it as the minimum round threshold for implicit accept votes. Even if the computed
713    /// GC round here is higher than the actual GC round used by the current block, it is still
714    /// correct although less efficient.
715    ///
716    /// gc_rounds is a list of cached commit indices and the GC rounds resulting from the commits.
717    /// It must be a superset of commits in the range [pending_commit_index, current_commit_index].
718    /// The first element should have pending_commit_index, because pending commit should be the
719    /// first commit buffered in CommitFinalizer.
720    fn gced_transaction_votes_for_pending_block(
721        gc_rounds: &[(CommitIndex, Round)],
722        pending_block_round: Round,
723        pending_commit_index: CommitIndex,
724        current_commit_index: CommitIndex,
725    ) -> bool {
726        assert!(
727            pending_commit_index <= current_commit_index,
728            "Pending {pending_commit_index} should be <= current {current_commit_index}"
729        );
730        if pending_commit_index == current_commit_index {
731            return false;
732        }
733        // current_commit_index is the commit index which includes the current / voting block.
734        // When proposing the current block, the latest possible GC round is the GC round computed
735        // from the leader of the previous commit (current_commit_index - 1).
736        let (commit_index, gc_round) = *gc_rounds
737            .get((current_commit_index - 1 - pending_commit_index) as usize)
738            .unwrap();
739        assert_eq!(
740            commit_index,
741            current_commit_index - 1,
742            "Commit index mismatch {commit_index} != {current_commit_index}"
743        );
744        pending_block_round <= gc_round
745    }
746
747    fn pop_finalized_commits(&mut self) -> Vec<CommittedSubDag> {
748        let mut finalized_commits = vec![];
749
750        while let Some(commit_state) = self.pending_commits.front() {
751            if !commit_state.pending_blocks.is_empty()
752                || !commit_state.pending_transactions.is_empty()
753            {
754                // The commit is not finalized yet.
755                break;
756            }
757
758            // Pop the finalized commit and set its rejected transactions.
759            let commit_state = self.pending_commits.pop_front().unwrap();
760            let mut commit = commit_state.commit;
761            for (block_ref, rejected_transactions) in commit_state.rejected_transactions {
762                commit
763                    .rejected_transactions_by_block
764                    .insert(block_ref, rejected_transactions.into_iter().collect());
765            }
766
767            // Clean up committed blocks.
768            let mut blocks_map = self.blocks.write();
769            for block in commit.blocks.iter() {
770                blocks_map.remove(&block.reference());
771            }
772
773            let round_delay = if let Some(last_commit_state) = self.pending_commits.back() {
774                last_commit_state.commit.leader.round - commit.leader.round
775            } else {
776                0
777            };
778            self.context
779                .metrics
780                .node_metrics
781                .finalizer_round_delay
782                .observe(round_delay as f64);
783
784            finalized_commits.push(commit);
785        }
786
787        finalized_commits
788    }
789
790    fn try_update_gc_round(&mut self, last_finalized_commit_round: Round) {
791        // GC TransactionCertifier state only with finalized commits, to ensure unfinalized transactions
792        // can access their reject votes from TransactionCertifier.
793        let gc_round = self
794            .dag_state
795            .read()
796            .calculate_gc_round(last_finalized_commit_round);
797        self.transaction_certifier.run_gc(gc_round);
798    }
799
800    #[cfg(test)]
801    fn is_empty(&self) -> bool {
802        self.pending_commits.is_empty() && self.blocks.read().is_empty()
803    }
804}
805
806struct CommitState {
807    commit: CommittedSubDag,
808    // Blocks pending finalization, mapped to the number of transactions in the block.
809    // This field is populated by all blocks in the commit, before direct finalization.
810    // After direct finalization, this field becomes empty.
811    pending_blocks: BTreeMap<BlockRef, usize>,
812    // Transactions pending indirect finalization.
813    // This field is populated after direct finalization, if pending transactions exist.
814    // Values in this field are removed as transactions are indirectly finalized or directly rejected.
815    // When both pending_blocks and pending_transactions are empty, the commit is finalized.
816    pending_transactions: BTreeMap<BlockRef, BTreeSet<TransactionIndex>>,
817    // Transactions rejected by a quorum or indirectly, per block.
818    rejected_transactions: BTreeMap<BlockRef, BTreeSet<TransactionIndex>>,
819}
820
821impl CommitState {
822    fn new(commit: CommittedSubDag) -> Self {
823        let pending_blocks: BTreeMap<_, _> = commit
824            .blocks
825            .iter()
826            .map(|b| (b.reference(), b.transactions().len()))
827            .collect();
828        assert!(!pending_blocks.is_empty());
829        Self {
830            commit,
831            pending_blocks,
832            pending_transactions: BTreeMap::new(),
833            rejected_transactions: BTreeMap::new(),
834        }
835    }
836
837    fn remove_pending_transactions(
838        &mut self,
839        block_ref: &BlockRef,
840        transactions: &[TransactionIndex],
841    ) {
842        let Some(block_pending_txns) = self.pending_transactions.get_mut(block_ref) else {
843            return;
844        };
845        for t in transactions {
846            block_pending_txns.remove(t);
847        }
848        if block_pending_txns.is_empty() {
849            self.pending_transactions.remove(block_ref);
850        }
851    }
852}
853
854struct BlockState {
855    // Content of the block.
856    block: VerifiedBlock,
857    // Blocks which has an explicit ancestor linking to this block.
858    children: BTreeSet<BlockRef>,
859    // Reject votes casted by this block, and by linked ancestors from the same authority.
860    reject_votes: BTreeMap<BlockRef, BTreeSet<TransactionIndex>>,
861    // Other committed blocks that are origin descendants of this block.
862    // See the comment above append_origin_descendants_from_last_commit() for more details.
863    origin_descendants: Vec<BlockRef>,
864    // Commit which contains this block.
865    commit_index: CommitIndex,
866}
867
868impl BlockState {
869    fn new(block: VerifiedBlock, commit_index: CommitIndex) -> Self {
870        let reject_votes: BTreeMap<_, _> = block
871            .transaction_votes()
872            .iter()
873            .map(|v| (v.block_ref, v.rejects.clone().into_iter().collect()))
874            .collect();
875        // With at most 4 pending commits and assume 2 origin descendants per commit,
876        // there will be at most 8 origin descendants.
877        let origin_descendants = Vec::with_capacity(8);
878        Self {
879            block,
880            children: BTreeSet::new(),
881            reject_votes,
882            origin_descendants,
883            commit_index,
884        }
885    }
886}
887
888#[cfg(test)]
889mod tests {
890    use mysten_metrics::monitored_mpsc;
891    use parking_lot::RwLock;
892
893    use crate::{
894        TestBlock, VerifiedBlock, block::BlockTransactionVotes, block_verifier::NoopBlockVerifier,
895        dag_state::DagState, linearizer::Linearizer, storage::mem_store::MemStore,
896        test_dag_builder::DagBuilder,
897    };
898
899    use super::*;
900
901    struct Fixture {
902        context: Arc<Context>,
903        dag_state: Arc<RwLock<DagState>>,
904        transaction_certifier: TransactionCertifier,
905        linearizer: Linearizer,
906        commit_finalizer: CommitFinalizer,
907    }
908
909    impl Fixture {
910        fn add_blocks(&self, blocks: Vec<VerifiedBlock>) {
911            self.transaction_certifier
912                .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
913            self.dag_state.write().accept_blocks(blocks);
914        }
915    }
916
917    fn create_commit_finalizer_fixture() -> Fixture {
918        let (mut context, _keys) = Context::new_for_test(4);
919        context
920            .protocol_config
921            .set_consensus_gc_depth_for_testing(5);
922        context
923            .protocol_config
924            .set_consensus_skip_gced_accept_votes_for_testing(true);
925        let context = Arc::new(context);
926        let dag_state = Arc::new(RwLock::new(DagState::new(
927            context.clone(),
928            Arc::new(MemStore::new()),
929        )));
930        let linearizer = Linearizer::new(context.clone(), dag_state.clone());
931        let (blocks_sender, _blocks_receiver) =
932            monitored_mpsc::unbounded_channel("consensus_block_output");
933        let transaction_certifier = TransactionCertifier::new(
934            context.clone(),
935            Arc::new(NoopBlockVerifier {}),
936            dag_state.clone(),
937            blocks_sender,
938        );
939        let (commit_sender, _commit_receiver) = unbounded_channel("consensus_commit_output");
940        let commit_finalizer = CommitFinalizer::new(
941            context.clone(),
942            dag_state.clone(),
943            transaction_certifier.clone(),
944            commit_sender,
945        );
946        Fixture {
947            context,
948            dag_state,
949            transaction_certifier,
950            linearizer,
951            commit_finalizer,
952        }
953    }
954
955    fn create_block(
956        round: Round,
957        authority: u32,
958        mut ancestors: Vec<BlockRef>,
959        num_transactions: usize,
960        reject_votes: Vec<BlockTransactionVotes>,
961    ) -> VerifiedBlock {
962        // Move own authority ancestor to the front of the ancestors.
963        let i = ancestors
964            .iter()
965            .position(|b| b.author.value() == authority as usize)
966            .unwrap_or_else(|| {
967                panic!("Authority {authority} (round {round}) not found in {ancestors:?}")
968            });
969        let b = ancestors.remove(i);
970        ancestors.insert(0, b);
971        // Create test block.
972        let block = TestBlock::new(round, authority)
973            .set_ancestors(ancestors)
974            .set_transactions(vec![crate::Transaction::new(vec![1; 16]); num_transactions])
975            .set_transaction_votes(reject_votes)
976            .build();
977        VerifiedBlock::new_for_test(block)
978    }
979
980    #[tokio::test]
981    async fn test_direct_finalize_no_reject_votes() {
982        let mut fixture = create_commit_finalizer_fixture();
983
984        // Create round 1-4 blocks with 10 transactions each. Add these blocks to transaction certifier.
985        let mut dag_builder = DagBuilder::new(fixture.context.clone());
986        dag_builder
987            .layers(1..=4)
988            .num_transactions(10)
989            .build()
990            .persist_layers(fixture.dag_state.clone());
991        let blocks = dag_builder.all_blocks();
992        fixture
993            .transaction_certifier
994            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
995
996        // Select a round 2 block as the leader and create CommittedSubDag.
997        let leader = blocks.iter().find(|b| b.round() == 2).unwrap();
998        let committed_sub_dags = fixture.linearizer.handle_commit(vec![leader.clone()]);
999        assert_eq!(committed_sub_dags.len(), 1);
1000        let committed_sub_dag = &committed_sub_dags[0];
1001
1002        // This committed sub-dag can be directly finalized.
1003        let finalized_commits = fixture
1004            .commit_finalizer
1005            .process_commit(committed_sub_dag.clone())
1006            .await;
1007        assert_eq!(finalized_commits.len(), 1);
1008        let finalized_commit = &finalized_commits[0];
1009        assert_eq!(committed_sub_dag, finalized_commit);
1010
1011        // CommitFinalizer should be empty.
1012        assert!(fixture.commit_finalizer.is_empty());
1013    }
1014
1015    // Commits can be directly finalized if when they are added to commit finalizer,
1016    // the rejected votes reach quorum if they exist on any transaction.
1017    #[tokio::test]
1018    async fn test_direct_finalize_with_reject_votes() {
1019        let mut fixture = create_commit_finalizer_fixture();
1020
1021        // Create round 1 blocks with 10 transactions each.
1022        let mut dag_builder = DagBuilder::new(fixture.context.clone());
1023        dag_builder
1024            .layer(1)
1025            .num_transactions(10)
1026            .build()
1027            .persist_layers(fixture.dag_state.clone());
1028        let round_1_blocks = dag_builder.all_blocks();
1029        fixture.transaction_certifier.add_voted_blocks(
1030            round_1_blocks
1031                .iter()
1032                .map(|b| {
1033                    if b.author().value() != 3 {
1034                        (b.clone(), vec![])
1035                    } else {
1036                        (b.clone(), vec![0, 3])
1037                    }
1038                })
1039                .collect(),
1040        );
1041
1042        // Select the block with rejected transaction.
1043        let block_with_rejected_txn = round_1_blocks[3].clone();
1044        let reject_vote = BlockTransactionVotes {
1045            block_ref: block_with_rejected_txn.reference(),
1046            rejects: vec![0, 3],
1047        };
1048
1049        // Create round 2 blocks without authority 3's block from round 1.
1050        let ancestors: Vec<BlockRef> = round_1_blocks[0..3].iter().map(|b| b.reference()).collect();
1051        // Leader links to block_with_rejected_txn, but other blocks do not.
1052        let round_2_blocks = vec![
1053            create_block(
1054                2,
1055                0,
1056                round_1_blocks.iter().map(|b| b.reference()).collect(),
1057                10,
1058                vec![reject_vote.clone()],
1059            ),
1060            create_block(2, 1, ancestors.clone(), 10, vec![]),
1061            create_block(2, 2, ancestors.clone(), 10, vec![]),
1062        ];
1063        fixture.add_blocks(round_2_blocks.clone());
1064
1065        // Select round 2 authority 0 block as the leader and create CommittedSubDag.
1066        let leader = round_2_blocks[0].clone();
1067        let committed_sub_dags = fixture.linearizer.handle_commit(vec![leader.clone()]);
1068        assert_eq!(committed_sub_dags.len(), 1);
1069        let committed_sub_dag = &committed_sub_dags[0];
1070        assert_eq!(committed_sub_dag.blocks.len(), 5);
1071
1072        // Create round 3 blocks voting on the leader.
1073        let ancestors: Vec<BlockRef> = round_2_blocks.iter().map(|b| b.reference()).collect();
1074        let round_3_blocks = vec![
1075            create_block(3, 0, ancestors.clone(), 0, vec![]),
1076            create_block(3, 1, ancestors.clone(), 0, vec![reject_vote.clone()]),
1077            create_block(3, 2, ancestors.clone(), 0, vec![reject_vote.clone()]),
1078            create_block(
1079                3,
1080                3,
1081                std::iter::once(round_1_blocks[3].reference())
1082                    .chain(ancestors.clone())
1083                    .collect(),
1084                0,
1085                vec![reject_vote.clone()],
1086            ),
1087        ];
1088        fixture.add_blocks(round_3_blocks.clone());
1089
1090        // Create round 4 blocks certifying the leader.
1091        let ancestors: Vec<BlockRef> = round_3_blocks.iter().map(|b| b.reference()).collect();
1092        let round_4_blocks = vec![
1093            create_block(4, 0, ancestors.clone(), 0, vec![]),
1094            create_block(4, 1, ancestors.clone(), 0, vec![]),
1095            create_block(4, 2, ancestors.clone(), 0, vec![]),
1096            create_block(4, 3, ancestors.clone(), 0, vec![]),
1097        ];
1098        fixture.add_blocks(round_4_blocks.clone());
1099
1100        // This committed sub-dag can be directly finalized because the rejected transactions
1101        // have a quorum of votes.
1102        let finalized_commits = fixture
1103            .commit_finalizer
1104            .process_commit(committed_sub_dag.clone())
1105            .await;
1106        assert_eq!(finalized_commits.len(), 1);
1107        let finalized_commit = &finalized_commits[0];
1108        assert_eq!(committed_sub_dag.commit_ref, finalized_commit.commit_ref);
1109        assert_eq!(committed_sub_dag.blocks, finalized_commit.blocks);
1110        assert_eq!(finalized_commit.rejected_transactions_by_block.len(), 1);
1111        assert_eq!(
1112            finalized_commit
1113                .rejected_transactions_by_block
1114                .get(&block_with_rejected_txn.reference())
1115                .unwrap()
1116                .clone(),
1117            vec![0, 3],
1118        );
1119
1120        // CommitFinalizer should be empty.
1121        assert!(fixture.commit_finalizer.is_empty());
1122    }
1123
1124    // Test indirect finalization when:
1125    // 1. Reject votes on transaction does not reach quorum initially, but reach quorum later.
1126    // 2. Transaction is indirectly rejected.
1127    // 3. Transaction is indirectly finalized.
1128    #[tokio::test]
1129    async fn test_indirect_finalize_with_reject_votes() {
1130        let mut fixture = create_commit_finalizer_fixture();
1131
1132        // Create round 1 blocks with 10 transactions each.
1133        let mut dag_builder = DagBuilder::new(fixture.context.clone());
1134        dag_builder
1135            .layer(1)
1136            .num_transactions(10)
1137            .build()
1138            .persist_layers(fixture.dag_state.clone());
1139        let round_1_blocks = dag_builder.all_blocks();
1140        fixture.transaction_certifier.add_voted_blocks(
1141            round_1_blocks
1142                .iter()
1143                .map(|b| {
1144                    if b.author().value() != 3 {
1145                        (b.clone(), vec![])
1146                    } else {
1147                        (b.clone(), vec![0, 3])
1148                    }
1149                })
1150                .collect(),
1151        );
1152
1153        // Select the block with rejected transaction.
1154        let block_with_rejected_txn = round_1_blocks[3].clone();
1155        // How transactions in this block will be voted:
1156        // Txn 1 (quorum reject): 1 reject vote at round 2, 1 reject vote at round 3, and 1 at round 4.
1157        // Txn 4 (indirect reject): 1 reject vote at round 3, and 1 at round 4.
1158        // Txn 7 (indirect finalize): 1 reject vote at round 3.
1159
1160        // Create round 2 blocks without authority 3.
1161        let ancestors: Vec<BlockRef> = round_1_blocks[0..3].iter().map(|b| b.reference()).collect();
1162        // Leader links to block_with_rejected_txn, but other blocks do not.
1163        let round_2_blocks = vec![
1164            create_block(
1165                2,
1166                0,
1167                round_1_blocks.iter().map(|b| b.reference()).collect(),
1168                10,
1169                vec![BlockTransactionVotes {
1170                    block_ref: block_with_rejected_txn.reference(),
1171                    rejects: vec![1, 4],
1172                }],
1173            ),
1174            // Use ancestors without authority 3 to avoid voting on its transactions.
1175            create_block(2, 1, ancestors.clone(), 10, vec![]),
1176            create_block(2, 2, ancestors.clone(), 10, vec![]),
1177        ];
1178        fixture.add_blocks(round_2_blocks.clone());
1179
1180        // Select round 2 authority 0 block as the a leader.
1181        let mut leaders = vec![round_2_blocks[0].clone()];
1182
1183        // Create round 3 blocks voting on the leader and casting reject votes.
1184        let ancestors: Vec<BlockRef> = round_2_blocks.iter().map(|b| b.reference()).collect();
1185        let round_3_blocks = vec![
1186            create_block(3, 0, ancestors.clone(), 0, vec![]),
1187            create_block(
1188                3,
1189                1,
1190                ancestors.clone(),
1191                0,
1192                vec![BlockTransactionVotes {
1193                    block_ref: block_with_rejected_txn.reference(),
1194                    rejects: vec![1, 4, 7],
1195                }],
1196            ),
1197            create_block(
1198                3,
1199                3,
1200                std::iter::once(round_1_blocks[3].reference())
1201                    .chain(ancestors.clone())
1202                    .collect(),
1203                0,
1204                vec![],
1205            ),
1206        ];
1207        fixture.add_blocks(round_3_blocks.clone());
1208        leaders.push(round_3_blocks[2].clone());
1209
1210        // Create round 4 blocks certifying the leader and casting reject votes.
1211        let ancestors: Vec<BlockRef> = round_3_blocks.iter().map(|b| b.reference()).collect();
1212        let round_4_blocks = vec![
1213            create_block(4, 0, ancestors.clone(), 0, vec![]),
1214            create_block(4, 1, ancestors.clone(), 0, vec![]),
1215            create_block(
1216                4,
1217                2,
1218                std::iter::once(round_2_blocks[2].reference())
1219                    .chain(ancestors.clone())
1220                    .collect(),
1221                0,
1222                vec![BlockTransactionVotes {
1223                    block_ref: block_with_rejected_txn.reference(),
1224                    rejects: vec![1],
1225                }],
1226            ),
1227            create_block(4, 3, ancestors.clone(), 0, vec![]),
1228        ];
1229        fixture.add_blocks(round_4_blocks.clone());
1230        leaders.push(round_4_blocks[1].clone());
1231
1232        // Create round 5-7 blocks without casting reject votes.
1233        // Select the last leader from round 5. It is necessary to have round 5 leader to indirectly finalize
1234        // transactions committed by round 2 leader.
1235        let mut last_round_blocks = round_4_blocks.clone();
1236        for r in 5..=7 {
1237            let ancestors: Vec<BlockRef> =
1238                last_round_blocks.iter().map(|b| b.reference()).collect();
1239            let round_blocks: Vec<_> = (0..4)
1240                .map(|i| create_block(r, i, ancestors.clone(), 0, vec![]))
1241                .collect();
1242            fixture.add_blocks(round_blocks.clone());
1243            if r == 5 {
1244                leaders.push(round_blocks[0].clone());
1245            }
1246            last_round_blocks = round_blocks;
1247        }
1248
1249        // Create CommittedSubDag from leaders.
1250        assert_eq!(leaders.len(), 4);
1251        let committed_sub_dags = fixture.linearizer.handle_commit(leaders);
1252        assert_eq!(committed_sub_dags.len(), 4);
1253
1254        // Buffering the initial 3 commits should not finalize.
1255        for commit in committed_sub_dags.iter().take(3) {
1256            let finalized_commits = fixture
1257                .commit_finalizer
1258                .process_commit(commit.clone())
1259                .await;
1260            assert_eq!(finalized_commits.len(), 0);
1261        }
1262
1263        // Buffering the 4th commit should finalize all commits.
1264        let finalized_commits = fixture
1265            .commit_finalizer
1266            .process_commit(committed_sub_dags[3].clone())
1267            .await;
1268        assert_eq!(finalized_commits.len(), 4);
1269
1270        // Check rejected transactions.
1271        let rejected_transactions = finalized_commits[0].rejected_transactions_by_block.clone();
1272        assert_eq!(rejected_transactions.len(), 1);
1273        assert_eq!(
1274            rejected_transactions
1275                .get(&block_with_rejected_txn.reference())
1276                .unwrap(),
1277            &vec![1, 4]
1278        );
1279
1280        // Other commits should have no rejected transactions.
1281        for commit in finalized_commits.iter().skip(1) {
1282            assert!(commit.rejected_transactions_by_block.is_empty());
1283        }
1284
1285        // CommitFinalizer should be empty.
1286        assert!(fixture.commit_finalizer.is_empty());
1287    }
1288
1289    // Test indirect finalization when transaction is rejected due to GC.
1290    #[tokio::test]
1291    async fn test_indirect_reject_with_gc() {
1292        let mut fixture = create_commit_finalizer_fixture();
1293        assert_eq!(fixture.context.protocol_config.consensus_gc_depth(), 5);
1294
1295        // Create round 1 blocks with 10 transactions each.
1296        let mut dag_builder = DagBuilder::new(fixture.context.clone());
1297        dag_builder
1298            .layer(1)
1299            .num_transactions(10)
1300            .build()
1301            .persist_layers(fixture.dag_state.clone());
1302        let round_1_blocks = dag_builder.all_blocks();
1303        fixture
1304            .transaction_certifier
1305            .add_voted_blocks(round_1_blocks.iter().map(|b| (b.clone(), vec![])).collect());
1306
1307        // Select B1(3) to have a rejected transaction.
1308        let block_with_rejected_txn = round_1_blocks[3].clone();
1309        // How transactions in this block will be voted:
1310        // Txn 1 (GC reject): 1 reject vote at round 2. But the txn will get rejected because there are only
1311        // 2 accept votes.
1312
1313        // Create round 2 blocks, with B2(1) rejecting transaction 1 from B1(3).
1314        // Note that 3 blocks link to B1(3) without rejecting transaction 1.
1315        let ancestors: Vec<BlockRef> = round_1_blocks.iter().map(|b| b.reference()).collect();
1316        let round_2_blocks = vec![
1317            create_block(2, 0, ancestors.clone(), 0, vec![]),
1318            create_block(
1319                2,
1320                1,
1321                ancestors.clone(),
1322                0,
1323                vec![BlockTransactionVotes {
1324                    block_ref: block_with_rejected_txn.reference(),
1325                    rejects: vec![1],
1326                }],
1327            ),
1328            create_block(2, 2, ancestors.clone(), 0, vec![]),
1329            create_block(2, 3, ancestors.clone(), 0, vec![]),
1330        ];
1331        fixture.add_blocks(round_2_blocks.clone());
1332
1333        // Create round 3-6 blocks without creating or linking to an authority 2 block.
1334        // The goal is to GC B2(2).
1335        let mut last_round_blocks: Vec<VerifiedBlock> = round_2_blocks
1336            .iter()
1337            .enumerate()
1338            .filter_map(|(i, b)| if i != 2 { Some(b.clone()) } else { None })
1339            .collect();
1340        for r in 3..=6 {
1341            let ancestors: Vec<BlockRef> =
1342                last_round_blocks.iter().map(|b| b.reference()).collect();
1343            last_round_blocks = [0, 1, 3]
1344                .map(|i| create_block(r, i, ancestors.clone(), 0, vec![]))
1345                .to_vec();
1346            fixture.add_blocks(last_round_blocks.clone());
1347        }
1348
1349        // Create round 7-10 blocks and add a leader from authority 0 of each round.
1350        let mut leaders = vec![];
1351        for r in 7..=10 {
1352            let mut ancestors: Vec<BlockRef> =
1353                last_round_blocks.iter().map(|b| b.reference()).collect();
1354            last_round_blocks = (0..4)
1355                .map(|i| {
1356                    if r == 7 && i == 2 {
1357                        // Link to the GC'ed block B2(2).
1358                        ancestors.push(round_2_blocks[2].reference());
1359                    }
1360                    create_block(r, i, ancestors.clone(), 0, vec![])
1361                })
1362                .collect();
1363            leaders.push(last_round_blocks[0].clone());
1364            fixture.add_blocks(last_round_blocks.clone());
1365        }
1366
1367        // Create CommittedSubDag from leaders.
1368        assert_eq!(leaders.len(), 4);
1369        let committed_sub_dags = fixture.linearizer.handle_commit(leaders);
1370        assert_eq!(committed_sub_dags.len(), 4);
1371
1372        // Ensure 1 reject vote is contained in B2(1) in commit 0.
1373        assert!(committed_sub_dags[0].blocks.contains(&round_2_blocks[1]));
1374        // Ensure B2(2) is GC'ed.
1375        for commit in committed_sub_dags.iter() {
1376            assert!(!commit.blocks.contains(&round_2_blocks[2]));
1377        }
1378
1379        // Buffering the initial 3 commits should not finalize.
1380        for commit in committed_sub_dags.iter().take(3) {
1381            assert!(commit.decided_with_local_blocks);
1382            let finalized_commits = fixture
1383                .commit_finalizer
1384                .process_commit(commit.clone())
1385                .await;
1386            assert_eq!(finalized_commits.len(), 0);
1387        }
1388
1389        // Buffering the 4th commit should finalize all commits.
1390        let finalized_commits = fixture
1391            .commit_finalizer
1392            .process_commit(committed_sub_dags[3].clone())
1393            .await;
1394        assert_eq!(finalized_commits.len(), 4);
1395
1396        // Check rejected transactions.
1397        // B1(3) txn 1 gets rejected, even though there are has 3 blocks links to B1(3) without rejecting txn 1.
1398        // This is because there are only 2 accept votes for this transaction, which is less than the quorum threshold.
1399        let rejected_transactions = finalized_commits[0].rejected_transactions_by_block.clone();
1400        assert_eq!(rejected_transactions.len(), 1);
1401        assert_eq!(
1402            rejected_transactions
1403                .get(&block_with_rejected_txn.reference())
1404                .unwrap(),
1405            &vec![1]
1406        );
1407
1408        // Other commits should have no rejected transactions.
1409        for commit in finalized_commits.iter().skip(1) {
1410            assert!(commit.rejected_transactions_by_block.is_empty());
1411        }
1412
1413        // CommitFinalizer should be empty.
1414        assert!(fixture.commit_finalizer.is_empty());
1415    }
1416
1417    #[tokio::test]
1418    async fn test_finalize_remote_commits_with_reject_votes() {
1419        let mut fixture: Fixture = create_commit_finalizer_fixture();
1420        let mut all_blocks = vec![];
1421
1422        // Create round 1 blocks with 10 transactions each.
1423        let mut dag_builder = DagBuilder::new(fixture.context.clone());
1424        dag_builder.layer(1).num_transactions(10).build();
1425        let round_1_blocks = dag_builder.all_blocks();
1426        all_blocks.push(round_1_blocks.clone());
1427
1428        // Collect leaders from round 1.
1429        let mut leaders = vec![round_1_blocks[0].clone()];
1430
1431        // Create round 2-9 blocks and set leaders until round 7.
1432        let mut last_round_blocks = round_1_blocks.clone();
1433        for r in 2..=9 {
1434            let ancestors: Vec<BlockRef> =
1435                last_round_blocks.iter().map(|b| b.reference()).collect();
1436            let round_blocks: Vec<_> = (0..4)
1437                .map(|i| create_block(r, i, ancestors.clone(), 0, vec![]))
1438                .collect();
1439            all_blocks.push(round_blocks.clone());
1440            if r <= 7 && r != 5 {
1441                leaders.push(round_blocks[r as usize % 4].clone());
1442            }
1443            last_round_blocks = round_blocks;
1444        }
1445
1446        // Leader rounds: 1, 2, 3, 4, 6, 7.
1447        assert_eq!(leaders.len(), 6);
1448
1449        async fn add_blocks_and_process_commit(
1450            fixture: &mut Fixture,
1451            leaders: &[VerifiedBlock],
1452            all_blocks: &[Vec<VerifiedBlock>],
1453            index: usize,
1454            local: bool,
1455        ) -> Vec<CommittedSubDag> {
1456            let leader = leaders[index].clone();
1457            // Add blocks related to the commit to DagState and TransactionCertifier.
1458            if local {
1459                for round_blocks in all_blocks.iter().take(leader.round() as usize + 2) {
1460                    fixture.add_blocks(round_blocks.clone());
1461                }
1462            } else {
1463                for round_blocks in all_blocks.iter().take(leader.round() as usize) {
1464                    fixture.add_blocks(round_blocks.clone());
1465                }
1466            };
1467            // Generate remote commit from leader.
1468            let mut committed_sub_dags = fixture.linearizer.handle_commit(vec![leader]);
1469            assert_eq!(committed_sub_dags.len(), 1);
1470            let mut remote_commit = committed_sub_dags.pop().unwrap();
1471            remote_commit.decided_with_local_blocks = local;
1472            // Process the remote commit.
1473            fixture
1474                .commit_finalizer
1475                .process_commit(remote_commit.clone())
1476                .await
1477        }
1478
1479        // Add commit 1-3 as remote commits. There should be no finalized commits.
1480        for i in 0..3 {
1481            let finalized_commits =
1482                add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, i, false).await;
1483            assert!(finalized_commits.is_empty());
1484        }
1485
1486        // Buffer round 4 commit as a remote commit. This should finalize the 1st commit at round 1.
1487        let finalized_commits =
1488            add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, 3, false).await;
1489        assert_eq!(finalized_commits.len(), 1);
1490        assert_eq!(finalized_commits[0].commit_ref.index, 1);
1491        assert_eq!(finalized_commits[0].leader.round, 1);
1492
1493        // Buffer round 6 (5th) commit as local commit. This should help finalize the commits at round 2 and 3.
1494        let finalized_commits =
1495            add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, 4, true).await;
1496        assert_eq!(finalized_commits.len(), 2);
1497        assert_eq!(finalized_commits[0].commit_ref.index, 2);
1498        assert_eq!(finalized_commits[0].leader.round, 2);
1499        assert_eq!(finalized_commits[1].commit_ref.index, 3);
1500        assert_eq!(finalized_commits[1].leader.round, 3);
1501
1502        // Buffer round 7 (6th) commit as local commit. This should help finalize the commits at round 4, 6 and 7 (itself).
1503        let finalized_commits =
1504            add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, 5, true).await;
1505        assert_eq!(finalized_commits.len(), 3);
1506        assert_eq!(finalized_commits[0].commit_ref.index, 4);
1507        assert_eq!(finalized_commits[0].leader.round, 4);
1508        assert_eq!(finalized_commits[1].commit_ref.index, 5);
1509        assert_eq!(finalized_commits[1].leader.round, 6);
1510        assert_eq!(finalized_commits[2].commit_ref.index, 6);
1511        assert_eq!(finalized_commits[2].leader.round, 7);
1512
1513        // CommitFinalizer should be empty.
1514        assert!(fixture.commit_finalizer.is_empty());
1515    }
1516}