consensus_core/
commit_finalizer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet, VecDeque},
6    sync::Arc,
7};
8
9use consensus_config::Stake;
10use consensus_types::block::{BlockRef, Round, TransactionIndex};
11use mysten_metrics::{
12    monitored_mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
13    monitored_scope, spawn_logged_monitored_task,
14};
15use parking_lot::RwLock;
16use tokio::task::JoinSet;
17
18use crate::{
19    BlockAPI, CommitIndex, CommittedSubDag, VerifiedBlock,
20    commit::DEFAULT_WAVE_LENGTH,
21    context::Context,
22    dag_state::DagState,
23    error::{ConsensusError, ConsensusResult},
24    stake_aggregator::{QuorumThreshold, StakeAggregator},
25    transaction_certifier::TransactionCertifier,
26};
27
28/// For transaction T committed at leader round R, when a new leader at round >= R + INDIRECT_REJECT_DEPTH
29/// commits and T is still not finalized, T is rejected.
30/// NOTE: 3 round is the minimum depth possible for indirect finalization and rejection.
31pub(crate) const INDIRECT_REJECT_DEPTH: Round = 3;
32
33/// Handle to CommitFinalizer, for sending CommittedSubDag.
34pub(crate) struct CommitFinalizerHandle {
35    sender: UnboundedSender<CommittedSubDag>,
36}
37
38impl CommitFinalizerHandle {
39    // Sends a CommittedSubDag to CommitFinalizer, which will finalize it before sending it to execution.
40    pub(crate) fn send(&self, commit: CommittedSubDag) -> ConsensusResult<()> {
41        self.sender.send(commit).map_err(|e| {
42            tracing::warn!("Failed to send to commit finalizer, probably due to shutdown: {e:?}");
43            ConsensusError::Shutdown
44        })
45    }
46}
47
48/// CommitFinalizer accepts a continuous stream of CommittedSubDag and outputs
49/// them when they are finalized.
50/// In finalized commits, every transaction is either finalized or rejected.
51/// It runs in a separate thread, to reduce the load on the core thread.
52///
53/// Life of a finalized commit:
54///
55/// For efficiency, finalization happens first for transactions without reject votes (common case).
56/// The pending undecided transactions with reject votes are individually finalized or rejected.
57/// When there is no more pending transactions, the commit is finalized.
58///
59/// This is correct because regardless if a commit leader was directly or indirectly committed,
60/// every committed block can be considered finalized, because at least one leader certificate of the commit
61/// will be committed, which can also serve as a certificate for the block and its transactions.
62///
63/// From the earliest buffered commit, pending blocks are checked to see if they are now finalized.
64/// New finalized blocks are removed from the pending blocks, and its transactions are moved to the
65/// finalized, rejected or pending state. If the commit now has no pending blocks or transactions,
66/// the commit is finalized and popped from the buffer. The next earliest commit is then processed
67/// similarly, until either the buffer becomes empty or a commit with pending blocks or transactions
68/// is encountered.
69pub struct CommitFinalizer {
70    context: Arc<Context>,
71    dag_state: Arc<RwLock<DagState>>,
72    transaction_certifier: TransactionCertifier,
73    commit_sender: UnboundedSender<CommittedSubDag>,
74
75    // Last commit index processed by CommitFinalizer.
76    last_processed_commit: Option<CommitIndex>,
77    // Commits pending finalization.
78    pending_commits: VecDeque<CommitState>,
79    // Blocks in the pending commits.
80    blocks: Arc<RwLock<BTreeMap<BlockRef, RwLock<BlockState>>>>,
81}
82
83impl CommitFinalizer {
84    pub fn new(
85        context: Arc<Context>,
86        dag_state: Arc<RwLock<DagState>>,
87        transaction_certifier: TransactionCertifier,
88        commit_sender: UnboundedSender<CommittedSubDag>,
89    ) -> Self {
90        Self {
91            context,
92            dag_state,
93            transaction_certifier,
94            commit_sender,
95            last_processed_commit: None,
96            pending_commits: VecDeque::new(),
97            blocks: Arc::new(RwLock::new(BTreeMap::new())),
98        }
99    }
100
101    pub(crate) fn start(
102        context: Arc<Context>,
103        dag_state: Arc<RwLock<DagState>>,
104        transaction_certifier: TransactionCertifier,
105        commit_sender: UnboundedSender<CommittedSubDag>,
106    ) -> CommitFinalizerHandle {
107        let processor = Self::new(context, dag_state, transaction_certifier, commit_sender);
108        let (sender, receiver) = unbounded_channel("consensus_commit_finalizer");
109        let _handle =
110            spawn_logged_monitored_task!(processor.run(receiver), "consensus_commit_finalizer");
111        CommitFinalizerHandle { sender }
112    }
113
114    async fn run(mut self, mut receiver: UnboundedReceiver<CommittedSubDag>) {
115        while let Some(committed_sub_dag) = receiver.recv().await {
116            let already_finalized = !self.context.protocol_config.mysticeti_fastpath()
117                || committed_sub_dag.recovered_rejected_transactions;
118            let finalized_commits = if !already_finalized {
119                self.process_commit(committed_sub_dag).await
120            } else {
121                vec![committed_sub_dag]
122            };
123            if !finalized_commits.is_empty() {
124                // Transaction certifier state should be GC'ed as soon as new commits are finalized.
125                // But this is done outside of process_commit(), because during recovery process_commit()
126                // is not called to finalize commits, but GC still needs to run.
127                self.try_update_gc_round(finalized_commits.last().unwrap().leader.round);
128                let mut dag_state = self.dag_state.write();
129                if !already_finalized {
130                    // Records rejected transactions in newly finalized commits.
131                    for commit in &finalized_commits {
132                        dag_state.add_finalized_commit(
133                            commit.commit_ref,
134                            commit.rejected_transactions_by_block.clone(),
135                        );
136                    }
137                }
138                // Commits and committed blocks must be persisted to storage before sending them to Sui
139                // to execute their finalized transactions.
140                // Commit metadata and uncommitted blocks can be persisted more lazily because they are recoverable.
141                // But for simplicity, all unpersisted commits and blocks are flushed to storage.
142                dag_state.flush();
143            }
144            for commit in finalized_commits {
145                if let Err(e) = self.commit_sender.send(commit) {
146                    tracing::warn!(
147                        "Failed to send to commit handler, probably due to shutdown: {e:?}"
148                    );
149                    return;
150                }
151            }
152        }
153    }
154
155    pub async fn process_commit(
156        &mut self,
157        committed_sub_dag: CommittedSubDag,
158    ) -> Vec<CommittedSubDag> {
159        let _scope = monitored_scope("CommitFinalizer::process_commit");
160
161        if let Some(last_processed_commit) = self.last_processed_commit {
162            assert_eq!(
163                last_processed_commit + 1,
164                committed_sub_dag.commit_ref.index
165            );
166        }
167        self.last_processed_commit = Some(committed_sub_dag.commit_ref.index);
168
169        self.pending_commits
170            .push_back(CommitState::new(committed_sub_dag));
171
172        let mut finalized_commits = vec![];
173
174        // The prerequisite for running direct finalization on a commit is that the commit must
175        // have either a quorum of leader certificates in the local DAG, or a committed leader certificate.
176        //
177        // A leader certificate is a finalization certificate for every block in the commit.
178        // When the prerequisite holds, all blocks in the current commit can be considered finalized.
179        // And any transaction in the current commit that has not observed reject votes will never be rejected.
180        // So these transactions are directly finalized.
181        //
182        // When a commit is direct, there are a quorum of its leader certificates in the local DAG.
183        //
184        // When a commit is indirect, it implies one of its leader certificates is in the committed blocks.
185        // So a leader certificate must exist in the local DAG as well.
186        //
187        // When a commit is received through commit sync and processed as certified commit, the commit might
188        // not have a leader certificate in the local DAG. So a committed transaction might not observe any reject
189        // vote from local DAG, although it will eventually get rejected. To finalize blocks in this commit,
190        // there must be another commit with leader round >= 3 (WAVE_LENGTH) rounds above the commit leader.
191        // From the indirect commit rule, a leader certificate must exist in committed blocks for the earliest commit.
192        for i in 0..self.pending_commits.len() {
193            let commit_state = &self.pending_commits[i];
194            if commit_state.pending_blocks.is_empty() {
195                // The commit has already been processed through direct finalization.
196                continue;
197            }
198            // Direct finalization cannot happen when
199            // -  This commit is remote.
200            // -  And the latest commit is less than 3 (WAVE_LENGTH) rounds above this commit.
201            // In this case, this commit's leader certificate is not guaranteed to be in local DAG.
202            if !commit_state.commit.decided_with_local_blocks {
203                let last_commit_state = self.pending_commits.back().unwrap();
204                if commit_state.commit.leader.round + DEFAULT_WAVE_LENGTH
205                    > last_commit_state.commit.leader.round
206                {
207                    break;
208                }
209            }
210            self.try_direct_finalize_commit(i);
211        }
212        let direct_finalized_commits = self.pop_finalized_commits();
213        self.context
214            .metrics
215            .node_metrics
216            .finalizer_output_commits
217            .with_label_values(&["direct"])
218            .inc_by(direct_finalized_commits.len() as u64);
219        finalized_commits.extend(direct_finalized_commits);
220
221        // Indirect finalization: one or more commits cannot be directly finalized.
222        // So the pending transactions need to be checked for indirect finalization.
223        if !self.pending_commits.is_empty() {
224            // Initialize the state of the last added commit for computing indirect finalization.
225            //
226            // As long as there are remaining commits, even if the last commit has been directly finalized,
227            // its state still needs to be initialized here to help indirectly finalize previous commits.
228            // This is because the last commit may have been directly finalized, but its previous commits
229            // may not have been directly finalized.
230            self.link_blocks_in_last_commit();
231            self.append_origin_descendants_from_last_commit();
232            // Try to indirectly finalize a prefix of the buffered commits.
233            // If only one commit remains, it cannot be indirectly finalized because there is no commit afterwards,
234            // so it is excluded.
235            while self.pending_commits.len() > 1 {
236                // Stop indirect finalization when the earliest commit has not been processed
237                // through direct finalization.
238                if !self.pending_commits[0].pending_blocks.is_empty() {
239                    break;
240                }
241                // Otherwise, try to indirectly finalize the earliest commit.
242                self.try_indirect_finalize_first_commit().await;
243                let indirect_finalized_commits = self.pop_finalized_commits();
244                if indirect_finalized_commits.is_empty() {
245                    // No additional commits can be indirectly finalized.
246                    break;
247                }
248                self.context
249                    .metrics
250                    .node_metrics
251                    .finalizer_output_commits
252                    .with_label_values(&["indirect"])
253                    .inc_by(indirect_finalized_commits.len() as u64);
254                finalized_commits.extend(indirect_finalized_commits);
255            }
256        }
257
258        self.context
259            .metrics
260            .node_metrics
261            .finalizer_buffered_commits
262            .set(self.pending_commits.len() as i64);
263
264        finalized_commits
265    }
266
267    // Tries directly finalizing transactions in the commit.
268    fn try_direct_finalize_commit(&mut self, index: usize) {
269        let num_commits = self.pending_commits.len();
270        let commit_state = self
271            .pending_commits
272            .get_mut(index)
273            .unwrap_or_else(|| panic!("Commit {} does not exist. len = {}", index, num_commits,));
274        // Direct commit means every transaction in the commit can be considered to have a quorum of post-commit certificates,
275        // unless the transaction has reject votes that do not reach quorum either.
276        assert!(!commit_state.pending_blocks.is_empty());
277
278        let metrics = &self.context.metrics.node_metrics;
279        let pending_blocks = std::mem::take(&mut commit_state.pending_blocks);
280        for (block_ref, num_transactions) in pending_blocks {
281            let reject_votes = self.transaction_certifier.get_reject_votes(&block_ref)
282                .unwrap_or_else(|| panic!("No vote info found for {block_ref}. It is either incorrectly gc'ed or failed to be recovered after crash."));
283            metrics
284                .finalizer_transaction_status
285                .with_label_values(&["direct_finalize"])
286                .inc_by((num_transactions - reject_votes.len()) as u64);
287            let hostname = &self.context.committee.authority(block_ref.author).hostname;
288            metrics
289                .finalizer_reject_votes
290                .with_label_values(&[hostname])
291                .inc_by(reject_votes.len() as u64);
292            // If a transaction_index does not exist in reject_votes, the transaction has no reject votes.
293            // So it is finalized and does not need to be added to pending_transactions.
294            for (transaction_index, stake) in reject_votes {
295                // If the transaction has > 0 but < 2f+1 reject votes, it is still pending.
296                // Otherwise, it is rejected.
297                let entry = if stake < self.context.committee.quorum_threshold() {
298                    commit_state
299                        .pending_transactions
300                        .entry(block_ref)
301                        .or_default()
302                } else {
303                    metrics
304                        .finalizer_transaction_status
305                        .with_label_values(&["direct_reject"])
306                        .inc();
307                    commit_state
308                        .rejected_transactions
309                        .entry(block_ref)
310                        .or_default()
311                };
312                entry.insert(transaction_index);
313            }
314        }
315    }
316
317    // Creates an entry in the blocks map for each block in the commit,
318    // and have its ancestors link to the block.
319    fn link_blocks_in_last_commit(&mut self) {
320        let commit_state = self
321            .pending_commits
322            .back_mut()
323            .unwrap_or_else(|| panic!("No pending commit."));
324
325        // Link blocks in ascending order of round, to ensure ancestor block states are created
326        // before they are linked from.
327        let mut blocks = commit_state.commit.blocks.clone();
328        blocks.sort_by_key(|b| b.round());
329
330        let mut blocks_map = self.blocks.write();
331        for block in blocks {
332            let block_ref = block.reference();
333            // Link ancestors to the block.
334            for ancestor in block.ancestors() {
335                // Ancestor may not exist in the blocks map if it has been finalized or gc'ed.
336                // So skip linking if the ancestor does not exist.
337                if let Some(ancestor_block) = blocks_map.get(ancestor) {
338                    ancestor_block.write().children.insert(block_ref);
339                }
340            }
341            // Initialize the block state.
342            blocks_map
343                .entry(block_ref)
344                .or_insert_with(|| RwLock::new(BlockState::new(block)));
345        }
346    }
347
348    /// To save bandwidth, blocks do not include explicit accept votes on transactions.
349    /// Reject votes are included only the first time the block containing the voted-on
350    /// transaction is linked in a block. Other first time linked transactions, when
351    /// not rejected, are assumed to be accepted. This vote compression rule must also be
352    /// applied during vote aggregation.
353    ///
354    /// Transactions in a block can only be voted on by its immediate descendants.
355    /// A block is an **immediate descendant** if it can only link directly to the voted-on
356    /// block, without any intermediate blocks from its own authority. Votes from
357    /// non-immediate descendants are ignored.
358    ///
359    /// This rule implies the following optimization is possible: after collecting votes from a block,
360    /// we can skip collecting votes from its **origin descendants** (descendant blocks from the
361    /// same authority), because their votes would be ignored anyway.
362    ///
363    /// This function updates the set of origin descendants for all pending blocks using blocks
364    /// from the last commit.
365    fn append_origin_descendants_from_last_commit(&mut self) {
366        let commit_state = self
367            .pending_commits
368            .back_mut()
369            .unwrap_or_else(|| panic!("No pending commit."));
370        let mut committed_blocks = commit_state.commit.blocks.clone();
371        committed_blocks.sort_by_key(|b| b.round());
372        let blocks_map = self.blocks.read();
373        for committed_block in committed_blocks {
374            let committed_block_ref = committed_block.reference();
375            // Each block must have at least one ancestor.
376            // Block verification ensures the first ancestor is from the block's own authority.
377            // Also, block verification ensures each authority appears at most once among ancestors.
378            let mut origin_ancestor_ref = *blocks_map
379                .get(&committed_block_ref)
380                .unwrap()
381                .read()
382                .block
383                .ancestors()
384                .first()
385                .unwrap();
386            while origin_ancestor_ref.author == committed_block_ref.author {
387                let Some(origin_ancestor_block) = blocks_map.get(&origin_ancestor_ref) else {
388                    break;
389                };
390                origin_ancestor_block
391                    .write()
392                    .origin_descendants
393                    .push(committed_block_ref);
394                origin_ancestor_ref = *origin_ancestor_block
395                    .read()
396                    .block
397                    .ancestors()
398                    .first()
399                    .unwrap();
400            }
401        }
402    }
403
404    // Tries indirectly finalizing the buffered commits at the given index.
405    async fn try_indirect_finalize_first_commit(&mut self) {
406        // Ensure direct finalization has been attempted for the commit.
407        assert!(!self.pending_commits.is_empty());
408        assert!(self.pending_commits[0].pending_blocks.is_empty());
409
410        // Optional optimization: re-check pending transactions to see if they are rejected by a quorum now.
411        self.check_pending_transactions_in_first_commit();
412
413        // Check if remaining pending transactions can be finalized.
414        self.try_indirect_finalize_pending_transactions_in_first_commit()
415            .await;
416
417        // Check if remaining pending transactions can be indirectly rejected.
418        self.try_indirect_reject_pending_transactions_in_first_commit();
419    }
420
421    fn check_pending_transactions_in_first_commit(&mut self) {
422        let mut all_rejected_transactions: Vec<(BlockRef, Vec<TransactionIndex>)> = vec![];
423
424        // Collect all rejected transactions without modifying state
425        for (block_ref, pending_transactions) in &self.pending_commits[0].pending_transactions {
426            let reject_votes: BTreeMap<TransactionIndex, Stake> = self
427                .transaction_certifier
428                .get_reject_votes(block_ref)
429                .unwrap_or_else(|| panic!("No vote info found for {block_ref}. It is incorrectly gc'ed or failed to be recovered after crash."))
430                .into_iter()
431                .collect();
432            let mut rejected_transactions = vec![];
433            for &transaction_index in pending_transactions {
434                // Pending transactions should always have reject votes.
435                let reject_stake = reject_votes.get(&transaction_index).copied().unwrap();
436                if reject_stake < self.context.committee.quorum_threshold() {
437                    // The transaction cannot be rejected yet.
438                    continue;
439                }
440                // Otherwise, mark the transaction for rejection.
441                rejected_transactions.push(transaction_index);
442            }
443            if !rejected_transactions.is_empty() {
444                all_rejected_transactions.push((*block_ref, rejected_transactions));
445            }
446        }
447
448        // Move rejected transactions from pending_transactions.
449        for (block_ref, rejected_transactions) in all_rejected_transactions {
450            self.context
451                .metrics
452                .node_metrics
453                .finalizer_transaction_status
454                .with_label_values(&["direct_late_reject"])
455                .inc_by(rejected_transactions.len() as u64);
456            let curr_commit_state = &mut self.pending_commits[0];
457            curr_commit_state.remove_pending_transactions(&block_ref, &rejected_transactions);
458            curr_commit_state
459                .rejected_transactions
460                .entry(block_ref)
461                .or_default()
462                .extend(rejected_transactions);
463        }
464    }
465
466    async fn try_indirect_finalize_pending_transactions_in_first_commit(&mut self) {
467        let _scope = monitored_scope(
468            "CommitFinalizer::try_indirect_finalize_pending_transactions_in_first_commit",
469        );
470
471        let pending_blocks: Vec<(BlockRef, BTreeSet<TransactionIndex>)> = self.pending_commits[0]
472            .pending_transactions
473            .iter()
474            .map(|(k, v)| (*k, v.clone()))
475            .collect();
476
477        // Number of blocks to process in each task.
478        const BLOCKS_PER_INDIRECT_COMMIT_TASK: usize = 8;
479
480        // Process chunks in parallel.
481        let mut all_finalized_transactions = vec![];
482        let mut join_set = JoinSet::new();
483        // TODO(fastpath): investigate using a cost based batching,
484        // for example each block has cost num authorities + pending_transactions.len().
485        for chunk in pending_blocks.chunks(BLOCKS_PER_INDIRECT_COMMIT_TASK) {
486            let context = self.context.clone();
487            let blocks = self.blocks.clone();
488            let chunk: Vec<(BlockRef, BTreeSet<TransactionIndex>)> = chunk.to_vec();
489
490            join_set.spawn(tokio::task::spawn_blocking(move || {
491                let mut chunk_results = Vec::new();
492
493                for (block_ref, pending_transactions) in chunk {
494                    let finalized = Self::try_indirect_finalize_pending_transactions_in_block(
495                        &context,
496                        &blocks,
497                        block_ref,
498                        pending_transactions,
499                    );
500
501                    if !finalized.is_empty() {
502                        chunk_results.push((block_ref, finalized));
503                    }
504                }
505
506                chunk_results
507            }));
508        }
509
510        // Collect results from all chunks
511        while let Some(result) = join_set.join_next().await {
512            let e = match result {
513                Ok(blocking_result) => match blocking_result {
514                    Ok(chunk_results) => {
515                        all_finalized_transactions.extend(chunk_results);
516                        continue;
517                    }
518                    Err(e) => e,
519                },
520                Err(e) => e,
521            };
522            if e.is_panic() {
523                std::panic::resume_unwind(e.into_panic());
524            }
525            tracing::info!("Process likely shutting down: {:?}", e);
526            // Ok to return. No potential inconsistency in state.
527            return;
528        }
529
530        for (block_ref, finalized_transactions) in all_finalized_transactions {
531            self.context
532                .metrics
533                .node_metrics
534                .finalizer_transaction_status
535                .with_label_values(&["indirect_finalize"])
536                .inc_by(finalized_transactions.len() as u64);
537            // Remove finalized transactions from pending transactions.
538            self.pending_commits[0]
539                .remove_pending_transactions(&block_ref, &finalized_transactions);
540        }
541    }
542
543    fn try_indirect_reject_pending_transactions_in_first_commit(&mut self) {
544        let curr_leader_round = self.pending_commits[0].commit.leader.round;
545        let last_commit_leader_round = self.pending_commits.back().unwrap().commit.leader.round;
546        if curr_leader_round + INDIRECT_REJECT_DEPTH <= last_commit_leader_round {
547            let curr_commit_state = &mut self.pending_commits[0];
548            // This function is called after trying to indirectly finalize pending blocks.
549            // When last commit leader round is INDIRECT_REJECT_DEPTH rounds higher or more,
550            // all pending blocks should have been finalized.
551            assert!(curr_commit_state.pending_blocks.is_empty());
552            // This function is called after trying to indirectly finalize pending transactions.
553            // All remaining pending transactions, since they are not finalized, should now be
554            // indirectly rejected.
555            let pending_transactions = std::mem::take(&mut curr_commit_state.pending_transactions);
556            for (block_ref, pending_transactions) in pending_transactions {
557                self.context
558                    .metrics
559                    .node_metrics
560                    .finalizer_transaction_status
561                    .with_label_values(&["indirect_reject"])
562                    .inc_by(pending_transactions.len() as u64);
563                curr_commit_state
564                    .rejected_transactions
565                    .entry(block_ref)
566                    .or_default()
567                    .extend(pending_transactions);
568            }
569        }
570    }
571
572    // Returns the indices of the requested pending transactions that are indirectly finalized.
573    // This function is used for checking finalization of transactions, so it must traverse
574    // all blocks which can contribute to the requested transactions' finalizations.
575    fn try_indirect_finalize_pending_transactions_in_block(
576        context: &Arc<Context>,
577        blocks: &Arc<RwLock<BTreeMap<BlockRef, RwLock<BlockState>>>>,
578        pending_block_ref: BlockRef,
579        pending_transactions: BTreeSet<TransactionIndex>,
580    ) -> Vec<TransactionIndex> {
581        if pending_transactions.is_empty() {
582            return vec![];
583        }
584        let mut accept_votes: BTreeMap<TransactionIndex, StakeAggregator<QuorumThreshold>> =
585            pending_transactions
586                .into_iter()
587                .map(|transaction_index| (transaction_index, StakeAggregator::new()))
588                .collect();
589        let mut finalized_transactions = vec![];
590        let blocks_map = blocks.read();
591        // Use BTreeSet to ensure always visit blocks in the earliest round.
592        let mut to_visit_blocks = blocks_map
593            .get(&pending_block_ref)
594            .unwrap()
595            .read()
596            .children
597            .clone();
598        // Blocks that have been visited.
599        let mut visited = BTreeSet::new();
600        // Blocks where votes and origin descendants should be ignored for processing.
601        let mut ignored = BTreeSet::new();
602        // Traverse children blocks breadth-first and accumulate accept votes for pending transactions.
603        while let Some(curr_block_ref) = to_visit_blocks.pop_first() {
604            if !visited.insert(curr_block_ref) {
605                continue;
606            }
607            let curr_block_state = blocks_map.get(&curr_block_ref).unwrap_or_else(|| panic!("Block {curr_block_ref} is either incorrectly gc'ed or failed to be recovered after crash.")).read();
608            // Ignore info from the block if its direct ancestor has been processed.
609            if ignored.insert(curr_block_ref) {
610                // Skip collecting votes from origin descendants of current block.
611                // Votes from origin descendants of current block do not count for this transactions.
612                // Consider this case: block B is an origin descendant of block A (from the same authority),
613                // and both blocks A and B link to another block C.
614                // Only B's implicit and explicit transaction votes on C are considered.
615                // None of A's implicit or explicit transaction votes on C should be considered.
616                ignored.extend(curr_block_state.origin_descendants.iter());
617                // Get reject votes from current block to the pending block.
618                let curr_block_reject_votes = curr_block_state
619                    .reject_votes
620                    .get(&pending_block_ref)
621                    .cloned()
622                    .unwrap_or_default();
623                // Because of lifetime, first collect finalized transactions, and then remove them from accept_votes.
624                let mut newly_finalized = vec![];
625                for (index, stake) in &mut accept_votes {
626                    // Skip if the transaction has been rejected by the current block.
627                    if curr_block_reject_votes.contains(index) {
628                        continue;
629                    }
630                    // Skip if the total stake has not reached quorum.
631                    if !stake.add(curr_block_ref.author, &context.committee) {
632                        continue;
633                    }
634                    newly_finalized.push(*index);
635                    finalized_transactions.push(*index);
636                }
637                // There is no need to aggregate additional votes for already finalized transactions.
638                for index in newly_finalized {
639                    accept_votes.remove(&index);
640                }
641                // End traversing if all blocks and requested transactions have reached quorum.
642                if accept_votes.is_empty() {
643                    break;
644                }
645            }
646            // Add additional children blocks to visit.
647            to_visit_blocks.extend(
648                curr_block_state
649                    .children
650                    .iter()
651                    .filter(|b| !visited.contains(*b)),
652            );
653        }
654        finalized_transactions
655    }
656
657    fn pop_finalized_commits(&mut self) -> Vec<CommittedSubDag> {
658        let mut finalized_commits = vec![];
659
660        while let Some(commit_state) = self.pending_commits.front() {
661            if !commit_state.pending_blocks.is_empty()
662                || !commit_state.pending_transactions.is_empty()
663            {
664                // The commit is not finalized yet.
665                break;
666            }
667
668            // Pop the finalized commit and set its rejected transactions.
669            let commit_state = self.pending_commits.pop_front().unwrap();
670            let mut commit = commit_state.commit;
671            for (block_ref, rejected_transactions) in commit_state.rejected_transactions {
672                commit
673                    .rejected_transactions_by_block
674                    .insert(block_ref, rejected_transactions.into_iter().collect());
675            }
676
677            // Clean up committed blocks.
678            let mut blocks_map = self.blocks.write();
679            for block in commit.blocks.iter() {
680                blocks_map.remove(&block.reference());
681            }
682
683            let round_delay = if let Some(last_commit_state) = self.pending_commits.back() {
684                last_commit_state.commit.leader.round - commit.leader.round
685            } else {
686                0
687            };
688            self.context
689                .metrics
690                .node_metrics
691                .finalizer_round_delay
692                .observe(round_delay as f64);
693
694            finalized_commits.push(commit);
695        }
696
697        finalized_commits
698    }
699
700    fn try_update_gc_round(&mut self, last_finalized_commit_round: Round) {
701        // GC TransactionCertifier state only with finalized commits, to ensure unfinalized transactions
702        // can access their reject votes from TransactionCertifier.
703        let gc_round = self
704            .dag_state
705            .read()
706            .calculate_gc_round(last_finalized_commit_round);
707        self.transaction_certifier.run_gc(gc_round);
708    }
709
710    #[cfg(test)]
711    fn is_empty(&self) -> bool {
712        self.pending_commits.is_empty() && self.blocks.read().is_empty()
713    }
714}
715
716struct CommitState {
717    commit: CommittedSubDag,
718    // Blocks pending finalization, mapped to the number of transactions in the block.
719    // This field is populated by all blocks in the commit, before direct finalization.
720    // After direct finalization, this field becomes empty.
721    pending_blocks: BTreeMap<BlockRef, usize>,
722    // Transactions pending indirect finalization.
723    // This field is populated after direct finalization, if pending transactions exist.
724    // Values in this field are removed as transactions are indirectly finalized or directly rejected.
725    // When both pending_blocks and pending_transactions are empty, the commit is finalized.
726    pending_transactions: BTreeMap<BlockRef, BTreeSet<TransactionIndex>>,
727    // Transactions rejected by a quorum or indirectly, per block.
728    rejected_transactions: BTreeMap<BlockRef, BTreeSet<TransactionIndex>>,
729}
730
731impl CommitState {
732    fn new(commit: CommittedSubDag) -> Self {
733        let pending_blocks: BTreeMap<_, _> = commit
734            .blocks
735            .iter()
736            .map(|b| (b.reference(), b.transactions().len()))
737            .collect();
738        assert!(!pending_blocks.is_empty());
739        Self {
740            commit,
741            pending_blocks,
742            pending_transactions: BTreeMap::new(),
743            rejected_transactions: BTreeMap::new(),
744        }
745    }
746
747    fn remove_pending_transactions(
748        &mut self,
749        block_ref: &BlockRef,
750        transactions: &[TransactionIndex],
751    ) {
752        let Some(block_pending_txns) = self.pending_transactions.get_mut(block_ref) else {
753            return;
754        };
755        for t in transactions {
756            block_pending_txns.remove(t);
757        }
758        if block_pending_txns.is_empty() {
759            self.pending_transactions.remove(block_ref);
760        }
761    }
762}
763
764struct BlockState {
765    // Content of the block.
766    block: VerifiedBlock,
767    // Blocks which has an explicit ancestor linking to this block.
768    children: BTreeSet<BlockRef>,
769    // Reject votes casted by this block, and by linked ancestors from the same authority.
770    reject_votes: BTreeMap<BlockRef, BTreeSet<TransactionIndex>>,
771    // Other committed blocks that are origin descendants of this block.
772    origin_descendants: Vec<BlockRef>,
773}
774
775impl BlockState {
776    fn new(block: VerifiedBlock) -> Self {
777        let reject_votes: BTreeMap<_, _> = block
778            .transaction_votes()
779            .iter()
780            .map(|v| (v.block_ref, v.rejects.clone().into_iter().collect()))
781            .collect();
782        // With at most 4 pending commits and assume 2 origin descendants per commit,
783        // there will be at most 8 origin descendants.
784        let origin_descendants = Vec::with_capacity(8);
785        Self {
786            block,
787            children: BTreeSet::new(),
788            reject_votes,
789            origin_descendants,
790        }
791    }
792}
793
794#[cfg(test)]
795mod tests {
796    use mysten_metrics::monitored_mpsc;
797    use parking_lot::RwLock;
798
799    use crate::{
800        TestBlock, VerifiedBlock, block::BlockTransactionVotes, block_verifier::NoopBlockVerifier,
801        dag_state::DagState, linearizer::Linearizer, storage::mem_store::MemStore,
802        test_dag_builder::DagBuilder,
803    };
804
805    use super::*;
806
807    struct Fixture {
808        context: Arc<Context>,
809        dag_state: Arc<RwLock<DagState>>,
810        transaction_certifier: TransactionCertifier,
811        linearizer: Linearizer,
812        commit_finalizer: CommitFinalizer,
813    }
814
815    impl Fixture {
816        fn add_blocks(&self, blocks: Vec<VerifiedBlock>) {
817            self.transaction_certifier
818                .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
819            self.dag_state.write().accept_blocks(blocks);
820        }
821    }
822
823    fn create_commit_finalizer_fixture() -> Fixture {
824        let (context, _keys) = Context::new_for_test(4);
825        let context = Arc::new(context);
826        let dag_state = Arc::new(RwLock::new(DagState::new(
827            context.clone(),
828            Arc::new(MemStore::new()),
829        )));
830        let linearizer = Linearizer::new(context.clone(), dag_state.clone());
831        let (blocks_sender, _blocks_receiver) =
832            monitored_mpsc::unbounded_channel("consensus_block_output");
833        let transaction_certifier = TransactionCertifier::new(
834            context.clone(),
835            Arc::new(NoopBlockVerifier {}),
836            dag_state.clone(),
837            blocks_sender,
838        );
839        let (commit_sender, _commit_receiver) = unbounded_channel("consensus_commit_output");
840        let commit_finalizer = CommitFinalizer::new(
841            context.clone(),
842            dag_state.clone(),
843            transaction_certifier.clone(),
844            commit_sender,
845        );
846        Fixture {
847            context,
848            dag_state,
849            transaction_certifier,
850            linearizer,
851            commit_finalizer,
852        }
853    }
854
855    fn create_block(
856        round: Round,
857        authority: u32,
858        mut ancestors: Vec<BlockRef>,
859        num_transactions: usize,
860        reject_votes: Vec<BlockTransactionVotes>,
861    ) -> VerifiedBlock {
862        // Move own authority ancestor to the front of the ancestors.
863        let i = ancestors
864            .iter()
865            .position(|b| b.author.value() == authority as usize)
866            .unwrap_or_else(|| {
867                panic!("Authority {authority} (round {round}) not found in {ancestors:?}")
868            });
869        let b = ancestors.remove(i);
870        ancestors.insert(0, b);
871        // Create test block.
872        let block = TestBlock::new(round, authority)
873            .set_ancestors(ancestors)
874            .set_transactions(vec![crate::Transaction::new(vec![1; 16]); num_transactions])
875            .set_transaction_votes(reject_votes)
876            .build();
877        VerifiedBlock::new_for_test(block)
878    }
879
880    #[tokio::test]
881    async fn test_direct_finalize_no_reject_votes() {
882        let mut fixture = create_commit_finalizer_fixture();
883
884        // Create round 1-4 blocks with 10 transactions each. Add these blocks to transaction certifier.
885        let mut dag_builder = DagBuilder::new(fixture.context.clone());
886        dag_builder
887            .layers(1..=4)
888            .num_transactions(10)
889            .build()
890            .persist_layers(fixture.dag_state.clone());
891        let blocks = dag_builder.all_blocks();
892        fixture
893            .transaction_certifier
894            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
895
896        // Select a round 2 block as the leader and create CommittedSubDag.
897        let leader = blocks.iter().find(|b| b.round() == 2).unwrap();
898        let committed_sub_dags = fixture.linearizer.handle_commit(vec![leader.clone()]);
899        assert_eq!(committed_sub_dags.len(), 1);
900        let committed_sub_dag = &committed_sub_dags[0];
901
902        // This committed sub-dag can be directly finalized.
903        let finalized_commits = fixture
904            .commit_finalizer
905            .process_commit(committed_sub_dag.clone())
906            .await;
907        assert_eq!(finalized_commits.len(), 1);
908        let finalized_commit = &finalized_commits[0];
909        assert_eq!(committed_sub_dag, finalized_commit);
910
911        // CommitFinalizer should be empty.
912        assert!(fixture.commit_finalizer.is_empty());
913    }
914
915    // Commits can be directly finalized if when they are added to commit finalizer,
916    // the rejected votes reach quorum if they exist on any transaction.
917    #[tokio::test]
918    async fn test_direct_finalize_with_reject_votes() {
919        let mut fixture = create_commit_finalizer_fixture();
920
921        // Create round 1 blocks with 10 transactions each.
922        let mut dag_builder = DagBuilder::new(fixture.context.clone());
923        dag_builder
924            .layer(1)
925            .num_transactions(10)
926            .build()
927            .persist_layers(fixture.dag_state.clone());
928        let round_1_blocks = dag_builder.all_blocks();
929        fixture.transaction_certifier.add_voted_blocks(
930            round_1_blocks
931                .iter()
932                .map(|b| {
933                    if b.author().value() != 3 {
934                        (b.clone(), vec![])
935                    } else {
936                        (b.clone(), vec![0, 3])
937                    }
938                })
939                .collect(),
940        );
941
942        // Select the block with rejected transaction.
943        let block_with_rejected_txn = round_1_blocks[3].clone();
944        let reject_vote = BlockTransactionVotes {
945            block_ref: block_with_rejected_txn.reference(),
946            rejects: vec![0, 3],
947        };
948
949        // Create round 2 blocks without authority 3's block from round 1.
950        let ancestors: Vec<BlockRef> = round_1_blocks[0..3].iter().map(|b| b.reference()).collect();
951        // Leader links to block_with_rejected_txn, but other blocks do not.
952        let round_2_blocks = vec![
953            create_block(
954                2,
955                0,
956                round_1_blocks.iter().map(|b| b.reference()).collect(),
957                10,
958                vec![reject_vote.clone()],
959            ),
960            create_block(2, 1, ancestors.clone(), 10, vec![]),
961            create_block(2, 2, ancestors.clone(), 10, vec![]),
962        ];
963        fixture.add_blocks(round_2_blocks.clone());
964
965        // Select round 2 authority 0 block as the leader and create CommittedSubDag.
966        let leader = round_2_blocks[0].clone();
967        let committed_sub_dags = fixture.linearizer.handle_commit(vec![leader.clone()]);
968        assert_eq!(committed_sub_dags.len(), 1);
969        let committed_sub_dag = &committed_sub_dags[0];
970        assert_eq!(committed_sub_dag.blocks.len(), 5);
971
972        // Create round 3 blocks voting on the leader.
973        let ancestors: Vec<BlockRef> = round_2_blocks.iter().map(|b| b.reference()).collect();
974        let round_3_blocks = vec![
975            create_block(3, 0, ancestors.clone(), 0, vec![]),
976            create_block(3, 1, ancestors.clone(), 0, vec![reject_vote.clone()]),
977            create_block(3, 2, ancestors.clone(), 0, vec![reject_vote.clone()]),
978            create_block(
979                3,
980                3,
981                std::iter::once(round_1_blocks[3].reference())
982                    .chain(ancestors.clone())
983                    .collect(),
984                0,
985                vec![reject_vote.clone()],
986            ),
987        ];
988        fixture.add_blocks(round_3_blocks.clone());
989
990        // Create round 4 blocks certifying the leader.
991        let ancestors: Vec<BlockRef> = round_3_blocks.iter().map(|b| b.reference()).collect();
992        let round_4_blocks = vec![
993            create_block(4, 0, ancestors.clone(), 0, vec![]),
994            create_block(4, 1, ancestors.clone(), 0, vec![]),
995            create_block(4, 2, ancestors.clone(), 0, vec![]),
996            create_block(4, 3, ancestors.clone(), 0, vec![]),
997        ];
998        fixture.add_blocks(round_4_blocks.clone());
999
1000        // This committed sub-dag can be directly finalized because the rejected transactions
1001        // have a quorum of votes.
1002        let finalized_commits = fixture
1003            .commit_finalizer
1004            .process_commit(committed_sub_dag.clone())
1005            .await;
1006        assert_eq!(finalized_commits.len(), 1);
1007        let finalized_commit = &finalized_commits[0];
1008        assert_eq!(committed_sub_dag.commit_ref, finalized_commit.commit_ref);
1009        assert_eq!(committed_sub_dag.blocks, finalized_commit.blocks);
1010        assert_eq!(finalized_commit.rejected_transactions_by_block.len(), 1);
1011        assert_eq!(
1012            finalized_commit
1013                .rejected_transactions_by_block
1014                .get(&block_with_rejected_txn.reference())
1015                .unwrap()
1016                .clone(),
1017            vec![0, 3],
1018        );
1019
1020        // CommitFinalizer should be empty.
1021        assert!(fixture.commit_finalizer.is_empty());
1022    }
1023
1024    // Test indirect finalization when:
1025    // 1. Reject votes on transaction does not reach quorum initially, but reach quorum later.
1026    // 2. Transaction is indirectly rejected.
1027    // 3. Transaction is indirectly finalized.
1028    #[tokio::test]
1029    async fn test_indirect_finalize_with_reject_votes() {
1030        let mut fixture = create_commit_finalizer_fixture();
1031
1032        // Create round 1 blocks with 10 transactions each.
1033        let mut dag_builder = DagBuilder::new(fixture.context.clone());
1034        dag_builder
1035            .layer(1)
1036            .num_transactions(10)
1037            .build()
1038            .persist_layers(fixture.dag_state.clone());
1039        let round_1_blocks = dag_builder.all_blocks();
1040        fixture.transaction_certifier.add_voted_blocks(
1041            round_1_blocks
1042                .iter()
1043                .map(|b| {
1044                    if b.author().value() != 3 {
1045                        (b.clone(), vec![])
1046                    } else {
1047                        (b.clone(), vec![0, 3])
1048                    }
1049                })
1050                .collect(),
1051        );
1052
1053        // Select the block with rejected transaction.
1054        let block_with_rejected_txn = round_1_blocks[3].clone();
1055        // How transactions in this block will be voted:
1056        // Txn 1 (quorum reject): 1 reject vote at round 2, 1 reject vote at round 3, and 1 at round 4.
1057        // Txn 4 (indirect reject): 1 reject vote at round 3, and 1 at round 4.
1058        // Txn 7 (indirect finalize): 1 reject vote at round 3.
1059
1060        // Create round 2 blocks without authority 3.
1061        let ancestors: Vec<BlockRef> = round_1_blocks[0..3].iter().map(|b| b.reference()).collect();
1062        // Leader links to block_with_rejected_txn, but other blocks do not.
1063        let round_2_blocks = vec![
1064            create_block(
1065                2,
1066                0,
1067                round_1_blocks.iter().map(|b| b.reference()).collect(),
1068                10,
1069                vec![BlockTransactionVotes {
1070                    block_ref: block_with_rejected_txn.reference(),
1071                    rejects: vec![1, 4],
1072                }],
1073            ),
1074            // Use ancestors without authority 3 to avoid voting on its transactions.
1075            create_block(2, 1, ancestors.clone(), 10, vec![]),
1076            create_block(2, 2, ancestors.clone(), 10, vec![]),
1077        ];
1078        fixture.add_blocks(round_2_blocks.clone());
1079
1080        // Select round 2 authority 0 block as the a leader.
1081        let mut leaders = vec![round_2_blocks[0].clone()];
1082
1083        // Create round 3 blocks voting on the leader and casting reject votes.
1084        let ancestors: Vec<BlockRef> = round_2_blocks.iter().map(|b| b.reference()).collect();
1085        let round_3_blocks = vec![
1086            create_block(3, 0, ancestors.clone(), 0, vec![]),
1087            create_block(
1088                3,
1089                1,
1090                ancestors.clone(),
1091                0,
1092                vec![BlockTransactionVotes {
1093                    block_ref: block_with_rejected_txn.reference(),
1094                    rejects: vec![1, 4, 7],
1095                }],
1096            ),
1097            create_block(
1098                3,
1099                3,
1100                std::iter::once(round_1_blocks[3].reference())
1101                    .chain(ancestors.clone())
1102                    .collect(),
1103                0,
1104                vec![],
1105            ),
1106        ];
1107        fixture.add_blocks(round_3_blocks.clone());
1108        leaders.push(round_3_blocks[2].clone());
1109
1110        // Create round 4 blocks certifying the leader and casting reject votes.
1111        let ancestors: Vec<BlockRef> = round_3_blocks.iter().map(|b| b.reference()).collect();
1112        let round_4_blocks = vec![
1113            create_block(4, 0, ancestors.clone(), 0, vec![]),
1114            create_block(4, 1, ancestors.clone(), 0, vec![]),
1115            create_block(
1116                4,
1117                2,
1118                std::iter::once(round_2_blocks[2].reference())
1119                    .chain(ancestors.clone())
1120                    .collect(),
1121                0,
1122                vec![BlockTransactionVotes {
1123                    block_ref: block_with_rejected_txn.reference(),
1124                    rejects: vec![1],
1125                }],
1126            ),
1127            create_block(4, 3, ancestors.clone(), 0, vec![]),
1128        ];
1129        fixture.add_blocks(round_4_blocks.clone());
1130        leaders.push(round_4_blocks[1].clone());
1131
1132        // Create round 5-7 blocks without casting reject votes.
1133        // Select the last leader from round 5. It is necessary to have round 5 leader to indirectly finalize
1134        // transactions committed by round 2 leader.
1135        let mut last_round_blocks = round_4_blocks.clone();
1136        for r in 5..=7 {
1137            let ancestors: Vec<BlockRef> =
1138                last_round_blocks.iter().map(|b| b.reference()).collect();
1139            let round_blocks: Vec<_> = (0..4)
1140                .map(|i| create_block(r, i, ancestors.clone(), 0, vec![]))
1141                .collect();
1142            fixture.add_blocks(round_blocks.clone());
1143            if r == 5 {
1144                leaders.push(round_blocks[0].clone());
1145            }
1146            last_round_blocks = round_blocks;
1147        }
1148
1149        // Create CommittedSubDag from leaders.
1150        assert_eq!(leaders.len(), 4);
1151        let committed_sub_dags = fixture.linearizer.handle_commit(leaders);
1152        assert_eq!(committed_sub_dags.len(), 4);
1153
1154        // Buffering the initial 3 commits should not finalize.
1155        for commit in committed_sub_dags.iter().take(3) {
1156            let finalized_commits = fixture
1157                .commit_finalizer
1158                .process_commit(commit.clone())
1159                .await;
1160            assert_eq!(finalized_commits.len(), 0);
1161        }
1162
1163        // Buffering the 4th commit should finalize all commits.
1164        let finalized_commits = fixture
1165            .commit_finalizer
1166            .process_commit(committed_sub_dags[3].clone())
1167            .await;
1168        assert_eq!(finalized_commits.len(), 4);
1169
1170        // Check rejected transactions.
1171        let rejected_transactions = finalized_commits[0].rejected_transactions_by_block.clone();
1172        assert_eq!(rejected_transactions.len(), 1);
1173        assert_eq!(
1174            rejected_transactions
1175                .get(&block_with_rejected_txn.reference())
1176                .unwrap(),
1177            &vec![1, 4]
1178        );
1179
1180        // Other commits should have no rejected transactions.
1181        for commit in finalized_commits.iter().skip(1) {
1182            assert!(commit.rejected_transactions_by_block.is_empty());
1183        }
1184
1185        // CommitFinalizer should be empty.
1186        assert!(fixture.commit_finalizer.is_empty());
1187    }
1188
1189    #[tokio::test]
1190    async fn test_finalize_remote_commits_with_reject_votes() {
1191        let mut fixture: Fixture = create_commit_finalizer_fixture();
1192        let mut all_blocks = vec![];
1193
1194        // Create round 1 blocks with 10 transactions each.
1195        let mut dag_builder = DagBuilder::new(fixture.context.clone());
1196        dag_builder.layer(1).num_transactions(10).build();
1197        let round_1_blocks = dag_builder.all_blocks();
1198        all_blocks.push(round_1_blocks.clone());
1199
1200        // Collect leaders from round 1.
1201        let mut leaders = vec![round_1_blocks[0].clone()];
1202
1203        // Create round 2-9 blocks and set leaders until round 7.
1204        let mut last_round_blocks = round_1_blocks.clone();
1205        for r in 2..=9 {
1206            let ancestors: Vec<BlockRef> =
1207                last_round_blocks.iter().map(|b| b.reference()).collect();
1208            let round_blocks: Vec<_> = (0..4)
1209                .map(|i| create_block(r, i, ancestors.clone(), 0, vec![]))
1210                .collect();
1211            all_blocks.push(round_blocks.clone());
1212            if r <= 7 && r != 5 {
1213                leaders.push(round_blocks[r as usize % 4].clone());
1214            }
1215            last_round_blocks = round_blocks;
1216        }
1217
1218        // Leader rounds: 1, 2, 3, 4, 6, 7.
1219        assert_eq!(leaders.len(), 6);
1220
1221        async fn add_blocks_and_process_commit(
1222            fixture: &mut Fixture,
1223            leaders: &[VerifiedBlock],
1224            all_blocks: &[Vec<VerifiedBlock>],
1225            index: usize,
1226            local: bool,
1227        ) -> Vec<CommittedSubDag> {
1228            let leader = leaders[index].clone();
1229            // Add blocks related to the commit to DagState and TransactionCertifier.
1230            if local {
1231                for round_blocks in all_blocks.iter().take(leader.round() as usize + 2) {
1232                    fixture.add_blocks(round_blocks.clone());
1233                }
1234            } else {
1235                for round_blocks in all_blocks.iter().take(leader.round() as usize) {
1236                    fixture.add_blocks(round_blocks.clone());
1237                }
1238            };
1239            // Generate remote commit from leader.
1240            let mut committed_sub_dags = fixture.linearizer.handle_commit(vec![leader]);
1241            assert_eq!(committed_sub_dags.len(), 1);
1242            let mut remote_commit = committed_sub_dags.pop().unwrap();
1243            remote_commit.decided_with_local_blocks = local;
1244            // Process the remote commit.
1245            fixture
1246                .commit_finalizer
1247                .process_commit(remote_commit.clone())
1248                .await
1249        }
1250
1251        // Add commit 1-3 as remote commits. There should be no finalized commits.
1252        for i in 0..3 {
1253            let finalized_commits =
1254                add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, i, false).await;
1255            assert!(finalized_commits.is_empty());
1256        }
1257
1258        // Buffer round 4 commit as a remote commit. This should finalize the 1st commit at round 1.
1259        let finalized_commits =
1260            add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, 3, false).await;
1261        assert_eq!(finalized_commits.len(), 1);
1262        assert_eq!(finalized_commits[0].commit_ref.index, 1);
1263        assert_eq!(finalized_commits[0].leader.round, 1);
1264
1265        // Buffer round 6 (5th) commit as local commit. This should help finalize the commits at round 2 and 3.
1266        let finalized_commits =
1267            add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, 4, true).await;
1268        assert_eq!(finalized_commits.len(), 2);
1269        assert_eq!(finalized_commits[0].commit_ref.index, 2);
1270        assert_eq!(finalized_commits[0].leader.round, 2);
1271        assert_eq!(finalized_commits[1].commit_ref.index, 3);
1272        assert_eq!(finalized_commits[1].leader.round, 3);
1273
1274        // Buffer round 7 (6th) commit as local commit. This should help finalize the commits at round 4, 6 and 7 (itself).
1275        let finalized_commits =
1276            add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, 5, true).await;
1277        assert_eq!(finalized_commits.len(), 3);
1278        assert_eq!(finalized_commits[0].commit_ref.index, 4);
1279        assert_eq!(finalized_commits[0].leader.round, 4);
1280        assert_eq!(finalized_commits[1].commit_ref.index, 5);
1281        assert_eq!(finalized_commits[1].leader.round, 6);
1282        assert_eq!(finalized_commits[2].commit_ref.index, 6);
1283        assert_eq!(finalized_commits[2].leader.round, 7);
1284
1285        // CommitFinalizer should be empty.
1286        assert!(fixture.commit_finalizer.is_empty());
1287    }
1288}