consensus_core/
commit_finalizer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet, VecDeque},
6    sync::Arc,
7};
8
9use consensus_config::Stake;
10use consensus_types::block::{BlockRef, Round, TransactionIndex};
11use mysten_metrics::{
12    monitored_mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
13    monitored_scope, spawn_logged_monitored_task,
14};
15use parking_lot::RwLock;
16
17use crate::{
18    BlockAPI, CommitIndex, CommittedSubDag, VerifiedBlock,
19    commit::DEFAULT_WAVE_LENGTH,
20    context::Context,
21    dag_state::DagState,
22    error::{ConsensusError, ConsensusResult},
23    stake_aggregator::{QuorumThreshold, StakeAggregator},
24    transaction_certifier::TransactionCertifier,
25};
26
27/// For transaction T committed at leader round R, when a new leader at round >= R + INDIRECT_REJECT_DEPTH
28/// commits and T is still not finalized, T is rejected.
29/// NOTE: 3 round is the minimum depth possible for indirect finalization and rejection.
30pub(crate) const INDIRECT_REJECT_DEPTH: Round = 3;
31
32/// Handle to CommitFinalizer, for sending CommittedSubDag.
33pub(crate) struct CommitFinalizerHandle {
34    sender: UnboundedSender<CommittedSubDag>,
35}
36
37impl CommitFinalizerHandle {
38    // Sends a CommittedSubDag to CommitFinalizer, which will finalize it before sending it to execution.
39    pub(crate) fn send(&self, commit: CommittedSubDag) -> ConsensusResult<()> {
40        self.sender.send(commit).map_err(|e| {
41            tracing::warn!("Failed to send to commit finalizer, probably due to shutdown: {e:?}");
42            ConsensusError::Shutdown
43        })
44    }
45}
46
47/// CommitFinalizer accepts a continuous stream of CommittedSubDag and outputs
48/// them when they are finalized.
49/// In finalized commits, every transaction is either finalized or rejected.
50/// It runs in a separate thread, to reduce the load on the core thread.
51///
52/// Life of a finalized commit:
53///
54/// For efficiency, finalization happens first for transactions without reject votes (common case).
55/// The pending undecided transactions with reject votes are individually finalized or rejected.
56/// When there is no more pending transactions, the commit is finalized.
57///
58/// This is correct because regardless if a commit leader was directly or indirectly committed,
59/// every committed block can be considered finalized, because at least one leader certificate of the commit
60/// will be committed, which can also serve as a certificate for the block and its transactions.
61///
62/// From the earliest buffered commit, pending blocks are checked to see if they are now finalized.
63/// New finalized blocks are removed from the pending blocks, and its transactions are moved to the
64/// finalized, rejected or pending state. If the commit now has no pending blocks or transactions,
65/// the commit is finalized and popped from the buffer. The next earliest commit is then processed
66/// similarly, until either the buffer becomes empty or a commit with pending blocks or transactions
67/// is encountered.
68pub struct CommitFinalizer {
69    context: Arc<Context>,
70    dag_state: Arc<RwLock<DagState>>,
71    transaction_certifier: TransactionCertifier,
72    commit_sender: UnboundedSender<CommittedSubDag>,
73
74    // Last commit index processed by CommitFinalizer.
75    last_processed_commit: Option<CommitIndex>,
76    // Commits pending finalization.
77    pending_commits: VecDeque<CommitState>,
78    // Blocks in the pending commits.
79    blocks: Arc<RwLock<BTreeMap<BlockRef, RwLock<BlockState>>>>,
80}
81
82impl CommitFinalizer {
83    pub fn new(
84        context: Arc<Context>,
85        dag_state: Arc<RwLock<DagState>>,
86        transaction_certifier: TransactionCertifier,
87        commit_sender: UnboundedSender<CommittedSubDag>,
88    ) -> Self {
89        Self {
90            context,
91            dag_state,
92            transaction_certifier,
93            commit_sender,
94            last_processed_commit: None,
95            pending_commits: VecDeque::new(),
96            blocks: Arc::new(RwLock::new(BTreeMap::new())),
97        }
98    }
99
100    pub(crate) fn start(
101        context: Arc<Context>,
102        dag_state: Arc<RwLock<DagState>>,
103        transaction_certifier: TransactionCertifier,
104        commit_sender: UnboundedSender<CommittedSubDag>,
105    ) -> CommitFinalizerHandle {
106        let processor = Self::new(context, dag_state, transaction_certifier, commit_sender);
107        let (sender, receiver) = unbounded_channel("consensus_commit_finalizer");
108        let _handle =
109            spawn_logged_monitored_task!(processor.run(receiver), "consensus_commit_finalizer");
110        CommitFinalizerHandle { sender }
111    }
112
113    async fn run(mut self, mut receiver: UnboundedReceiver<CommittedSubDag>) {
114        while let Some(committed_sub_dag) = receiver.recv().await {
115            let already_finalized = !self.context.protocol_config.mysticeti_fastpath()
116                || committed_sub_dag.recovered_rejected_transactions;
117            let finalized_commits = if !already_finalized {
118                self.process_commit(committed_sub_dag).await
119            } else {
120                vec![committed_sub_dag]
121            };
122            if !finalized_commits.is_empty() {
123                // Transaction certifier state should be GC'ed as soon as new commits are finalized.
124                // But this is done outside of process_commit(), because during recovery process_commit()
125                // is not called to finalize commits, but GC still needs to run.
126                self.try_update_gc_round(finalized_commits.last().unwrap().leader.round);
127                let mut dag_state = self.dag_state.write();
128                if !already_finalized {
129                    // Records rejected transactions in newly finalized commits.
130                    for commit in &finalized_commits {
131                        dag_state.add_finalized_commit(
132                            commit.commit_ref,
133                            commit.rejected_transactions_by_block.clone(),
134                        );
135                    }
136                }
137                // Commits and committed blocks must be persisted to storage before sending them to Sui
138                // to execute their finalized transactions.
139                // Commit metadata and uncommitted blocks can be persisted more lazily because they are recoverable.
140                // But for simplicity, all unpersisted commits and blocks are flushed to storage.
141                dag_state.flush();
142            }
143            for commit in finalized_commits {
144                if let Err(e) = self.commit_sender.send(commit) {
145                    tracing::warn!(
146                        "Failed to send to commit handler, probably due to shutdown: {e:?}"
147                    );
148                    return;
149                }
150            }
151        }
152    }
153
154    pub async fn process_commit(
155        &mut self,
156        committed_sub_dag: CommittedSubDag,
157    ) -> Vec<CommittedSubDag> {
158        let _scope = monitored_scope("CommitFinalizer::process_commit");
159
160        if let Some(last_processed_commit) = self.last_processed_commit {
161            assert_eq!(
162                last_processed_commit + 1,
163                committed_sub_dag.commit_ref.index
164            );
165        }
166        self.last_processed_commit = Some(committed_sub_dag.commit_ref.index);
167
168        self.pending_commits
169            .push_back(CommitState::new(committed_sub_dag));
170
171        let mut finalized_commits = vec![];
172
173        // The prerequisite for running direct finalization on a commit is that the commit must
174        // have either a quorum of leader certificates in the local DAG, or a committed leader certificate.
175        //
176        // A leader certificate is a finalization certificate for every block in the commit.
177        // When the prerequisite holds, all blocks in the current commit can be considered finalized.
178        // And any transaction in the current commit that has not observed reject votes will never be rejected.
179        // So these transactions are directly finalized.
180        //
181        // When a commit is direct, there are a quorum of its leader certificates in the local DAG.
182        //
183        // When a commit is indirect, it implies one of its leader certificates is in the committed blocks.
184        // So a leader certificate must exist in the local DAG as well.
185        //
186        // When a commit is received through commit sync and processed as certified commit, the commit might
187        // not have a leader certificate in the local DAG. So a committed transaction might not observe any reject
188        // vote from local DAG, although it will eventually get rejected. To finalize blocks in this commit,
189        // there must be another commit with leader round >= 3 (WAVE_LENGTH) rounds above the commit leader.
190        // From the indirect commit rule, a leader certificate must exist in committed blocks for the earliest commit.
191        for i in 0..self.pending_commits.len() {
192            let commit_state = &self.pending_commits[i];
193            if commit_state.pending_blocks.is_empty() {
194                // The commit has already been processed through direct finalization.
195                continue;
196            }
197            // Direct finalization cannot happen when
198            // -  This commit is remote.
199            // -  And the latest commit is less than 3 (WAVE_LENGTH) rounds above this commit.
200            // In this case, this commit's leader certificate is not guaranteed to be in local DAG.
201            if !commit_state.commit.decided_with_local_blocks {
202                let last_commit_state = self.pending_commits.back().unwrap();
203                if commit_state.commit.leader.round + DEFAULT_WAVE_LENGTH
204                    > last_commit_state.commit.leader.round
205                {
206                    break;
207                }
208            }
209            self.try_direct_finalize_commit(i);
210        }
211        let direct_finalized_commits = self.pop_finalized_commits();
212        self.context
213            .metrics
214            .node_metrics
215            .finalizer_output_commits
216            .with_label_values(&["direct"])
217            .inc_by(direct_finalized_commits.len() as u64);
218        finalized_commits.extend(direct_finalized_commits);
219
220        // Indirect finalization: one or more commits cannot be directly finalized.
221        // So the pending transactions need to be checked for indirect finalization.
222        if !self.pending_commits.is_empty() {
223            // Initialize the state of the last added commit for computing indirect finalization.
224            //
225            // As long as there are remaining commits, even if the last commit has been directly finalized,
226            // its state still needs to be initialized here to help indirectly finalize previous commits.
227            // This is because the last commit may have been directly finalized, but its previous commits
228            // may not have been directly finalized.
229            self.link_blocks_in_last_commit();
230            self.append_origin_descendants_from_last_commit();
231            // Try to indirectly finalize a prefix of the buffered commits.
232            // If only one commit remains, it cannot be indirectly finalized because there is no commit afterwards,
233            // so it is excluded.
234            while self.pending_commits.len() > 1 {
235                // Stop indirect finalization when the earliest commit has not been processed
236                // through direct finalization.
237                if !self.pending_commits[0].pending_blocks.is_empty() {
238                    break;
239                }
240                // Otherwise, try to indirectly finalize the earliest commit.
241                self.try_indirect_finalize_first_commit().await;
242                let indirect_finalized_commits = self.pop_finalized_commits();
243                if indirect_finalized_commits.is_empty() {
244                    // No additional commits can be indirectly finalized.
245                    break;
246                }
247                self.context
248                    .metrics
249                    .node_metrics
250                    .finalizer_output_commits
251                    .with_label_values(&["indirect"])
252                    .inc_by(indirect_finalized_commits.len() as u64);
253                finalized_commits.extend(indirect_finalized_commits);
254            }
255        }
256
257        self.context
258            .metrics
259            .node_metrics
260            .finalizer_buffered_commits
261            .set(self.pending_commits.len() as i64);
262
263        finalized_commits
264    }
265
266    // Tries directly finalizing transactions in the commit.
267    // Direct commit means every transaction in the commit can be considered to have a quorum of post-commit certificates,
268    // unless (1) the transaction has reject votes that do not reach quorum, or
269    // (2) the block containing the transaction is outside the GC bound of the commit's leader.
270    // In the 2nd case, when the blocks voting and certifying this commit's leader were proposed, there is a chance
271    // that some of these voting and certifying blocks do not include votes for the transactions below the leader's GC bound.
272    // So conservatively, these transactions are not directly finalized. The logic here matches the GC logic in
273    // try_indirect_finalize_pending_transactions_in_block().
274    fn try_direct_finalize_commit(&mut self, index: usize) {
275        let metrics = &self.context.metrics.node_metrics;
276        let num_commits = self.pending_commits.len();
277        let commit_state = self
278            .pending_commits
279            .get_mut(index)
280            .unwrap_or_else(|| panic!("Commit {} does not exist. len = {}", index, num_commits));
281
282        // Estimate conservatively the GC round of the blocks voting and certifying this commit's leader of round (R).
283        //
284        // The key question we try to answer: "Could the voting blocks (R+1) and certifying blocks (R+2) have seen all blocks in this commit at the time they were proposed by their respective nodes, or
285        // could they have discarded them due to their local GC round already advanced by being ahead in their commit round?"
286        //
287        // It's not possible to know the exact GC round that was used when the voting and certifying blocks were proposed, so we use the most conservative threshold.
288        // We assume that the nodes proposing those blocks had already committed the `commit_state.commit.leader.round + INDIRECT_REJECT_DEPTH` - (R+3) and we calculate the gc_round of it as the cut off.
289        //
290        // Why we use the (R+3) leader round though?
291        //
292        // According to the protocol (R+3) is the min num of rounds needed for the leader's certificate to appear in a commit. Since we can't know which exact blocks form the certificate, we assume (R+3) being this minimum
293        // limit which also aligns with the assumptions made in the indirect finalization logic.
294        //
295        // So we assume as if the blocks that certify the leader (R) had advanced to the commit (R+3), and thus any block below the `vote_gc_round`, the gc round after the (R+3) leader was committed, could have been locally gced
296        // and never been voted by the certifying blocks.
297        let vote_gc_round = self
298            .dag_state
299            .read()
300            .calculate_gc_round(commit_state.commit.leader.round + INDIRECT_REJECT_DEPTH);
301        tracing::debug!(
302            "Trying to direct finalize commit {} using vote GC round {}",
303            commit_state.commit.commit_ref,
304            vote_gc_round,
305        );
306
307        // Each commit can only try direct finalization once.
308        assert!(!commit_state.pending_blocks.is_empty());
309        let pending_blocks = std::mem::take(&mut commit_state.pending_blocks);
310
311        for (block_ref, num_transactions) in pending_blocks {
312            if block_ref.round <= vote_gc_round && num_transactions > 0 {
313                // The block is outside of GC bound.
314                let transactions =
315                    (0..(num_transactions as TransactionIndex)).collect::<BTreeSet<_>>();
316                commit_state
317                    .pending_transactions
318                    .entry(block_ref)
319                    .or_default()
320                    .extend(transactions);
321                let hostname = &self.context.committee.authority(block_ref.author).hostname;
322                metrics
323                    .finalizer_skipped_voting_blocks
324                    .with_label_values(&[hostname.as_str(), "direct"])
325                    .inc();
326                tracing::debug!(
327                    "Block {} is potentially outside of GC bound from its leader {} in commit {}. Skipping direct finalization.",
328                    block_ref,
329                    commit_state.commit.leader,
330                    commit_state.commit.commit_ref
331                );
332                continue;
333            }
334            let reject_votes = self.transaction_certifier.get_reject_votes(&block_ref)
335                .unwrap_or_else(|| panic!("No vote info found for {block_ref}. It is either incorrectly gc'ed or failed to be recovered after crash."));
336            metrics
337                .finalizer_transaction_status
338                .with_label_values(&["direct_finalize"])
339                .inc_by((num_transactions - reject_votes.len()) as u64);
340            let hostname = &self.context.committee.authority(block_ref.author).hostname;
341            metrics
342                .finalizer_reject_votes
343                .with_label_values(&[hostname])
344                .inc_by(reject_votes.len() as u64);
345            // If a transaction_index does not exist in reject_votes, the transaction has no reject votes.
346            // So it is finalized and does not need to be added to pending_transactions.
347            for (transaction_index, stake) in reject_votes {
348                // If the transaction has > 0 but < 2f+1 reject votes, it is still pending.
349                // Otherwise, it is rejected.
350                let entry = if stake < self.context.committee.quorum_threshold() {
351                    commit_state
352                        .pending_transactions
353                        .entry(block_ref)
354                        .or_default()
355                } else {
356                    metrics
357                        .finalizer_transaction_status
358                        .with_label_values(&["direct_reject"])
359                        .inc();
360                    commit_state
361                        .rejected_transactions
362                        .entry(block_ref)
363                        .or_default()
364                };
365                entry.insert(transaction_index);
366            }
367        }
368    }
369
370    // Creates an entry in the blocks map for each block in the commit,
371    // and have its ancestors link to the block.
372    fn link_blocks_in_last_commit(&mut self) {
373        let commit_state = self
374            .pending_commits
375            .back_mut()
376            .unwrap_or_else(|| panic!("No pending commit."));
377
378        // Link blocks in ascending order of round, to ensure ancestor block states are created
379        // before they are linked from.
380        let mut blocks = commit_state.commit.blocks.clone();
381        blocks.sort_by_key(|b| b.round());
382
383        let mut blocks_map = self.blocks.write();
384        for block in blocks {
385            let block_ref = block.reference();
386            // Link ancestors to the block.
387            for ancestor in block.ancestors() {
388                // Ancestor may not exist in the blocks map if it has been finalized or gc'ed.
389                // So skip linking if the ancestor does not exist.
390                if let Some(ancestor_block) = blocks_map.get(ancestor) {
391                    ancestor_block.write().children.insert(block_ref);
392                }
393            }
394            // Initialize the block state.
395            blocks_map.entry(block_ref).or_insert_with(|| {
396                RwLock::new(BlockState::new(block, commit_state.commit.commit_ref.index))
397            });
398        }
399    }
400
401    /// Updates the set of origin descendants, by appending blocks from the last commit to
402    /// origin descendants of previous linked blocks from the same origin.
403    ///
404    /// The purpose of maintaining the origin descendants per block is to save bandwidth by avoiding to explicitly
405    /// list all accept votes on transactions in blocks.
406    /// Instead when an ancestor block Ba is first included by a proposed block Bp, reject votes for transactions in Ba
407    /// are explicitly listed (if they exist). The rest of non-rejected transactions in Ba are assumed to be accepted by Bp.
408    /// This vote compression rule must be applied during vote aggregation as well.
409    ///
410    /// The above rule is equivalent to saying that transactions in a block can only be voted on by its immediate descendants.
411    /// A block Bp is an **immediate descendant** of Ba, if any directed path from Bp to Ba does not contain a block from Bp's own authority.
412    ///
413    /// This rule implies the following optimization is possible: after collecting votes for Ba from block Bp,
414    /// we can skip collecting votes from Bp's **origin descendants** (descendant blocks from the
415    /// same authority), because they cannot vote on Ba anyway.
416    ///
417    /// This vote compression rule is easy to implement when proposing blocks. Reject votes can be gathered against
418    /// all the newly included ancestors of the proposed block. But vote decompression is trickier to get right.
419    /// One edge case is when a block may not be an immediate descendant, because of GC. In this case votes from the
420    /// block should not be counted.
421    fn append_origin_descendants_from_last_commit(&mut self) {
422        let commit_state = self
423            .pending_commits
424            .back_mut()
425            .unwrap_or_else(|| panic!("No pending commit."));
426        let mut committed_blocks = commit_state.commit.blocks.clone();
427        committed_blocks.sort_by_key(|b| b.round());
428        let blocks_map = self.blocks.read();
429        for committed_block in committed_blocks {
430            let committed_block_ref = committed_block.reference();
431            // Each block must have at least one ancestor.
432            // Block verification ensures the first ancestor is from the block's own authority.
433            // Also, block verification ensures each authority appears at most once among ancestors.
434            let mut origin_ancestor_ref = *blocks_map
435                .get(&committed_block_ref)
436                .unwrap()
437                .read()
438                .block
439                .ancestors()
440                .first()
441                .unwrap();
442            while origin_ancestor_ref.author == committed_block_ref.author {
443                let Some(origin_ancestor_block) = blocks_map.get(&origin_ancestor_ref) else {
444                    break;
445                };
446                origin_ancestor_block
447                    .write()
448                    .origin_descendants
449                    .push(committed_block_ref);
450                origin_ancestor_ref = *origin_ancestor_block
451                    .read()
452                    .block
453                    .ancestors()
454                    .first()
455                    .unwrap();
456            }
457        }
458    }
459
460    // Tries indirectly finalizing the buffered commits at the given index.
461    async fn try_indirect_finalize_first_commit(&mut self) {
462        // Ensure direct finalization has been attempted for the commit.
463        assert!(!self.pending_commits.is_empty());
464        assert!(self.pending_commits[0].pending_blocks.is_empty());
465
466        // Optional optimization: re-check pending transactions to see if they are rejected by a quorum now.
467        self.check_pending_transactions_in_first_commit();
468
469        // Check if remaining pending transactions can be finalized.
470        self.try_indirect_finalize_pending_transactions_in_first_commit()
471            .await;
472
473        // Check if remaining pending transactions can be indirectly rejected.
474        self.try_indirect_reject_pending_transactions_in_first_commit();
475    }
476
477    fn check_pending_transactions_in_first_commit(&mut self) {
478        let mut all_rejected_transactions: Vec<(BlockRef, Vec<TransactionIndex>)> = vec![];
479
480        // Collect all rejected transactions without modifying state
481        for (block_ref, pending_transactions) in &self.pending_commits[0].pending_transactions {
482            let reject_votes: BTreeMap<TransactionIndex, Stake> = self
483                .transaction_certifier
484                .get_reject_votes(block_ref)
485                .unwrap_or_else(|| panic!("No vote info found for {block_ref}. It is incorrectly gc'ed or failed to be recovered after crash."))
486                .into_iter()
487                .collect();
488            let mut rejected_transactions = vec![];
489            for &transaction_index in pending_transactions {
490                // Pending transactions do not have reject votes when the block is outside of GC bound from the commit leader's round.
491                let reject_stake = reject_votes
492                    .get(&transaction_index)
493                    .copied()
494                    .unwrap_or_default();
495                if reject_stake < self.context.committee.quorum_threshold() {
496                    // The transaction cannot be rejected yet.
497                    continue;
498                }
499                // Otherwise, mark the transaction for rejection.
500                rejected_transactions.push(transaction_index);
501            }
502            if !rejected_transactions.is_empty() {
503                all_rejected_transactions.push((*block_ref, rejected_transactions));
504            }
505        }
506
507        // Move rejected transactions from pending_transactions.
508        for (block_ref, rejected_transactions) in all_rejected_transactions {
509            self.context
510                .metrics
511                .node_metrics
512                .finalizer_transaction_status
513                .with_label_values(&["direct_late_reject"])
514                .inc_by(rejected_transactions.len() as u64);
515            let curr_commit_state = &mut self.pending_commits[0];
516            curr_commit_state.remove_pending_transactions(&block_ref, &rejected_transactions);
517            curr_commit_state
518                .rejected_transactions
519                .entry(block_ref)
520                .or_default()
521                .extend(rejected_transactions);
522        }
523    }
524
525    async fn try_indirect_finalize_pending_transactions_in_first_commit(&mut self) {
526        tracing::debug!(
527            "Trying to indirectly finalize pending transactions in first commit {}",
528            self.pending_commits[0].commit.commit_ref,
529        );
530        let _scope = monitored_scope(
531            "CommitFinalizer::try_indirect_finalize_pending_transactions_in_first_commit",
532        );
533
534        let pending_blocks: Vec<_> = self.pending_commits[0]
535            .pending_transactions
536            .iter()
537            .map(|(k, v)| (*k, v.clone()))
538            .collect();
539
540        let gc_rounds = self
541            .pending_commits
542            .iter()
543            .map(|c| {
544                (
545                    c.commit.commit_ref.index,
546                    self.dag_state
547                        .read()
548                        .calculate_gc_round(c.commit.leader.round),
549                )
550            })
551            .collect::<Vec<_>>();
552
553        // Number of blocks to process in each task.
554        const BLOCKS_PER_INDIRECT_COMMIT_TASK: usize = 8;
555
556        // Process chunks in parallel.
557        let mut all_finalized_transactions = vec![];
558        let mut handles = Vec::new();
559        // TODO(fastpath): investigate using a cost based batching,
560        // for example each block has cost num authorities + pending_transactions.len().
561        for chunk in pending_blocks.chunks(BLOCKS_PER_INDIRECT_COMMIT_TASK) {
562            let context = self.context.clone();
563            let blocks = self.blocks.clone();
564            let gc_rounds = gc_rounds.clone();
565            let chunk: Vec<(BlockRef, BTreeSet<TransactionIndex>)> = chunk.to_vec();
566
567            let handle = tokio::task::spawn_blocking(move || {
568                let mut chunk_results = Vec::new();
569
570                for (block_ref, pending_transactions) in chunk {
571                    let finalized = Self::try_indirect_finalize_pending_transactions_in_block(
572                        &context,
573                        &blocks,
574                        &gc_rounds,
575                        block_ref,
576                        pending_transactions,
577                    );
578
579                    if !finalized.is_empty() {
580                        chunk_results.push((block_ref, finalized));
581                    }
582                }
583
584                chunk_results
585            });
586
587            handles.push(handle);
588        }
589
590        // Collect results from all chunks
591        for handle in handles {
592            let result = match handle.await {
593                Ok(chunk_results) => {
594                    all_finalized_transactions.extend(chunk_results);
595                    continue;
596                }
597                Err(e) => e,
598            };
599            if result.is_panic() {
600                std::panic::resume_unwind(result.into_panic());
601            }
602            tracing::info!("Process likely shutting down: {:?}", result);
603            // Ok to return. No potential inconsistency in state.
604            return;
605        }
606
607        for (block_ref, finalized_transactions) in all_finalized_transactions {
608            self.context
609                .metrics
610                .node_metrics
611                .finalizer_transaction_status
612                .with_label_values(&["indirect_finalize"])
613                .inc_by(finalized_transactions.len() as u64);
614            // Remove finalized transactions from pending transactions.
615            self.pending_commits[0]
616                .remove_pending_transactions(&block_ref, &finalized_transactions);
617        }
618    }
619
620    fn try_indirect_reject_pending_transactions_in_first_commit(&mut self) {
621        let curr_leader_round = self.pending_commits[0].commit.leader.round;
622        let last_commit_leader_round = self.pending_commits.back().unwrap().commit.leader.round;
623        if curr_leader_round + INDIRECT_REJECT_DEPTH <= last_commit_leader_round {
624            let curr_commit_state = &mut self.pending_commits[0];
625            // This function is called after trying to indirectly finalize pending blocks.
626            // When last commit leader round is INDIRECT_REJECT_DEPTH rounds higher or more,
627            // all pending blocks should have been finalized.
628            assert!(curr_commit_state.pending_blocks.is_empty());
629            // This function is called after trying to indirectly finalize pending transactions.
630            // All remaining pending transactions, since they are not finalized, should now be
631            // indirectly rejected.
632            let pending_transactions = std::mem::take(&mut curr_commit_state.pending_transactions);
633            for (block_ref, pending_transactions) in pending_transactions {
634                self.context
635                    .metrics
636                    .node_metrics
637                    .finalizer_transaction_status
638                    .with_label_values(&["indirect_reject"])
639                    .inc_by(pending_transactions.len() as u64);
640                curr_commit_state
641                    .rejected_transactions
642                    .entry(block_ref)
643                    .or_default()
644                    .extend(pending_transactions);
645            }
646        }
647    }
648
649    // Returns the indices of the requested pending transactions that are indirectly finalized.
650    // This function is used for checking finalization of transactions, so it must traverse
651    // all blocks which can contribute to the requested transactions' finalizations.
652    fn try_indirect_finalize_pending_transactions_in_block(
653        context: &Arc<Context>,
654        blocks: &Arc<RwLock<BTreeMap<BlockRef, RwLock<BlockState>>>>,
655        gc_rounds: &[(CommitIndex, Round)],
656        pending_block_ref: BlockRef,
657        pending_transactions: BTreeSet<TransactionIndex>,
658    ) -> Vec<TransactionIndex> {
659        if pending_transactions.is_empty() {
660            return vec![];
661        }
662        let mut accept_votes: BTreeMap<TransactionIndex, StakeAggregator<QuorumThreshold>> =
663            pending_transactions
664                .into_iter()
665                .map(|transaction_index| (transaction_index, StakeAggregator::new()))
666                .collect();
667        let mut finalized_transactions = vec![];
668        let blocks_map = blocks.read();
669        // Use BTreeSet for to_visit_blocks, to visit blocks in the earliest round first.
670        let (pending_commit_index, mut to_visit_blocks) = {
671            let block_state = blocks_map.get(&pending_block_ref).unwrap().read();
672            (block_state.commit_index, block_state.children.clone())
673        };
674        // Blocks that have been visited.
675        let mut visited = BTreeSet::new();
676        // Blocks where votes and origin descendants should be ignored for processing.
677        let mut ignored = BTreeSet::new();
678        // Traverse children blocks breadth-first and accumulate accept votes for pending transactions.
679        while let Some(curr_block_ref) = to_visit_blocks.pop_first() {
680            if !visited.insert(curr_block_ref) {
681                continue;
682            }
683            let curr_block_state = blocks_map.get(&curr_block_ref).unwrap_or_else(|| panic!("Block {curr_block_ref} is either incorrectly gc'ed or failed to be recovered after crash.")).read();
684            // Check if transaction votes for the pending block are potentially not carried by the
685            // current block, because of GC at the current block's proposer.
686            // See comment above gced_transaction_votes_for_pending_block() for more details.
687            //
688            // Implicit transaction votes should only be considered in commit finalizer if they are definitely
689            // part of the transactions votes from the current block when it is proposed.
690            let votes_gced = Self::gced_transaction_votes_for_pending_block(
691                gc_rounds,
692                pending_block_ref.round,
693                pending_commit_index,
694                curr_block_state.commit_index,
695            );
696            // Skip counting votes from the block if it has been marked to be ignored.
697            if ignored.insert(curr_block_ref) {
698                // Skip collecting votes from origin descendants of current block.
699                // Votes from origin descendants of current block do not count for these transactions.
700                // Consider this case: block B is an origin descendant of block A (from the same authority),
701                // and both blocks A and B link to another block C.
702                // Only B's implicit and explicit transaction votes on C are considered.
703                // None of A's implicit or explicit transaction votes on C should be considered.
704                //
705                // See append_origin_descendants_from_last_commit() for more details.
706                ignored.extend(curr_block_state.origin_descendants.iter());
707                // Skip counting votes from current block if the votes on pending block could have been
708                // casted by an earlier block from the same origin, or the votes might not be proposed due to GC.
709                // Note: if the current block casts reject votes on transactions in the pending block,
710                // it can be assumed that accept votes are also casted to other transactions in the pending block.
711                // But we choose to skip counting the accept votes in this edge case for simplicity.
712                if votes_gced {
713                    let hostname = &context
714                        .committee
715                        .authority(pending_block_ref.author)
716                        .hostname;
717                    context
718                        .metrics
719                        .node_metrics
720                        .finalizer_skipped_voting_blocks
721                        .with_label_values(&[hostname.as_str(), "indirect"])
722                        .inc();
723                    tracing::debug!(
724                        "Block {} is potentially outside of GC bound from current block {}. Skipping indirect finalization.",
725                        pending_block_ref,
726                        curr_block_ref,
727                    );
728                    continue;
729                }
730                // Get reject votes from current block to the pending block.
731                let curr_block_reject_votes = curr_block_state
732                    .reject_votes
733                    .get(&pending_block_ref)
734                    .cloned()
735                    .unwrap_or_default();
736                // Because of lifetime, first collect finalized transactions, and then remove them from accept_votes.
737                let mut newly_finalized = vec![];
738                for (index, stake) in &mut accept_votes {
739                    // Skip if the transaction has been rejected by the current block.
740                    if curr_block_reject_votes.contains(index) {
741                        continue;
742                    }
743                    // Skip if the total stake has not reached quorum.
744                    if !stake.add(curr_block_ref.author, &context.committee) {
745                        continue;
746                    }
747                    newly_finalized.push(*index);
748                    finalized_transactions.push(*index);
749                }
750                // There is no need to aggregate additional votes for already finalized transactions.
751                for index in newly_finalized {
752                    accept_votes.remove(&index);
753                }
754                // End traversal if all blocks and requested transactions have reached quorum.
755                if accept_votes.is_empty() {
756                    break;
757                }
758            }
759            // Add additional children blocks to visit.
760            to_visit_blocks.extend(
761                curr_block_state
762                    .children
763                    .iter()
764                    .filter(|b| !visited.contains(*b)),
765            );
766        }
767        finalized_transactions
768    }
769
770    /// Returns true if transaction votes from the current block to the pending block
771    /// could have been be GC'ed. If this is the case, the current block cannot be assumed
772    /// to have implicitly voted to accept transactions in the pending block.
773    ///
774    /// When collecting transaction votes during proposal of the current block
775    /// (via DagState::link_causal_history()), votes against blocks in the DAG
776    /// below the proposer's GC round are skipped. Implicit accept votes cannot be assumed
777    /// for these GC'ed blocks. However, blocks do not carry the GC round when they are proposed.
778    /// So this function computes the highest possible GC round when the current block was proposed,
779    /// and use it as the minimum round threshold for implicit accept votes. Even if the computed
780    /// GC round here is higher than the actual GC round used by the current block, it is still
781    /// correct although less efficient.
782    ///
783    /// gc_rounds is a list of cached commit indices and the GC rounds resulting from the commits.
784    /// It must be a superset of commits in the range [pending_commit_index, current_commit_index].
785    /// The first element should have pending_commit_index, because pending commit should be the
786    /// first commit buffered in CommitFinalizer.
787    fn gced_transaction_votes_for_pending_block(
788        gc_rounds: &[(CommitIndex, Round)],
789        pending_block_round: Round,
790        pending_commit_index: CommitIndex,
791        current_commit_index: CommitIndex,
792    ) -> bool {
793        assert!(
794            pending_commit_index <= current_commit_index,
795            "Pending {pending_commit_index} should be <= current {current_commit_index}"
796        );
797        if pending_commit_index == current_commit_index {
798            return false;
799        }
800        // current_commit_index is the commit index which includes the current / voting block.
801        // When the current block was proposed, the latest/highest possible GC round that could have been used is the GC round computed
802        // from the leader of the previous commit (current_commit_index - 1). This acts as the most conservative threshold to make sure
803        // that the current block had actually "seen" the blocks within it's local GC bound.
804        let (commit_index, gc_round) = *gc_rounds
805            .get((current_commit_index - 1 - pending_commit_index) as usize)
806            .unwrap();
807        assert_eq!(
808            commit_index,
809            current_commit_index - 1,
810            "Commit index mismatch {commit_index} != {current_commit_index}"
811        );
812        pending_block_round <= gc_round
813    }
814
815    fn pop_finalized_commits(&mut self) -> Vec<CommittedSubDag> {
816        let mut finalized_commits = vec![];
817
818        while let Some(commit_state) = self.pending_commits.front() {
819            if !commit_state.pending_blocks.is_empty()
820                || !commit_state.pending_transactions.is_empty()
821            {
822                // The commit is not finalized yet.
823                break;
824            }
825
826            // Pop the finalized commit and set its rejected transactions.
827            let commit_state = self.pending_commits.pop_front().unwrap();
828            let mut commit = commit_state.commit;
829            for (block_ref, rejected_transactions) in commit_state.rejected_transactions {
830                commit
831                    .rejected_transactions_by_block
832                    .insert(block_ref, rejected_transactions.into_iter().collect());
833            }
834
835            // Clean up committed blocks.
836            let mut blocks_map = self.blocks.write();
837            for block in commit.blocks.iter() {
838                blocks_map.remove(&block.reference());
839            }
840
841            let round_delay = if let Some(last_commit_state) = self.pending_commits.back() {
842                last_commit_state.commit.leader.round - commit.leader.round
843            } else {
844                0
845            };
846            self.context
847                .metrics
848                .node_metrics
849                .finalizer_round_delay
850                .observe(round_delay as f64);
851
852            finalized_commits.push(commit);
853        }
854
855        finalized_commits
856    }
857
858    fn try_update_gc_round(&mut self, last_finalized_commit_round: Round) {
859        // GC TransactionCertifier state only with finalized commits, to ensure unfinalized transactions
860        // can access their reject votes from TransactionCertifier.
861        let gc_round = self
862            .dag_state
863            .read()
864            .calculate_gc_round(last_finalized_commit_round);
865        self.transaction_certifier.run_gc(gc_round);
866    }
867
868    #[cfg(test)]
869    fn is_empty(&self) -> bool {
870        self.pending_commits.is_empty() && self.blocks.read().is_empty()
871    }
872}
873
874struct CommitState {
875    commit: CommittedSubDag,
876    // Blocks pending finalization, mapped to the number of transactions in the block.
877    // This field is populated by all blocks in the commit, before direct finalization.
878    // After direct finalization, this field becomes empty.
879    pending_blocks: BTreeMap<BlockRef, usize>,
880    // Transactions pending indirect finalization.
881    // This field is populated after direct finalization, if pending transactions exist.
882    // Values in this field are removed as transactions are indirectly finalized or directly rejected.
883    // When both pending_blocks and pending_transactions are empty, the commit is finalized.
884    pending_transactions: BTreeMap<BlockRef, BTreeSet<TransactionIndex>>,
885    // Transactions rejected by a quorum or indirectly, per block.
886    rejected_transactions: BTreeMap<BlockRef, BTreeSet<TransactionIndex>>,
887}
888
889impl CommitState {
890    fn new(commit: CommittedSubDag) -> Self {
891        let pending_blocks: BTreeMap<_, _> = commit
892            .blocks
893            .iter()
894            .map(|b| (b.reference(), b.transactions().len()))
895            .collect();
896        assert!(!pending_blocks.is_empty());
897        Self {
898            commit,
899            pending_blocks,
900            pending_transactions: BTreeMap::new(),
901            rejected_transactions: BTreeMap::new(),
902        }
903    }
904
905    fn remove_pending_transactions(
906        &mut self,
907        block_ref: &BlockRef,
908        transactions: &[TransactionIndex],
909    ) {
910        let Some(block_pending_txns) = self.pending_transactions.get_mut(block_ref) else {
911            return;
912        };
913        for t in transactions {
914            block_pending_txns.remove(t);
915        }
916        if block_pending_txns.is_empty() {
917            self.pending_transactions.remove(block_ref);
918        }
919    }
920}
921
922struct BlockState {
923    // Content of the block.
924    block: VerifiedBlock,
925    // Blocks which has an explicit ancestor linking to this block.
926    children: BTreeSet<BlockRef>,
927    // Reject votes casted by this block, and by linked ancestors from the same authority.
928    reject_votes: BTreeMap<BlockRef, BTreeSet<TransactionIndex>>,
929    // Other committed blocks that are origin descendants of this block.
930    // See the comment above append_origin_descendants_from_last_commit() for more details.
931    origin_descendants: Vec<BlockRef>,
932    // Commit which contains this block.
933    commit_index: CommitIndex,
934}
935
936impl BlockState {
937    fn new(block: VerifiedBlock, commit_index: CommitIndex) -> Self {
938        let reject_votes: BTreeMap<_, _> = block
939            .transaction_votes()
940            .iter()
941            .map(|v| (v.block_ref, v.rejects.clone().into_iter().collect()))
942            .collect();
943        // With at most 4 pending commits and assume 2 origin descendants per commit,
944        // there will be at most 8 origin descendants.
945        let origin_descendants = Vec::with_capacity(8);
946        Self {
947            block,
948            children: BTreeSet::new(),
949            reject_votes,
950            origin_descendants,
951            commit_index,
952        }
953    }
954}
955
956#[cfg(test)]
957mod tests {
958    use crate::{
959        TestBlock, VerifiedBlock, block::BlockTransactionVotes,
960        commit_test_fixture::CommitTestFixture, test_dag_builder::DagBuilder,
961    };
962
963    use super::*;
964
965    fn create_commit_finalizer_fixture() -> CommitTestFixture {
966        CommitTestFixture::with_options(4, 0, Some(5))
967    }
968
969    fn create_block(
970        round: Round,
971        authority: u32,
972        mut ancestors: Vec<BlockRef>,
973        num_transactions: usize,
974        reject_votes: Vec<BlockTransactionVotes>,
975    ) -> VerifiedBlock {
976        // Move own authority ancestor to the front of the ancestors.
977        let i = ancestors
978            .iter()
979            .position(|b| b.author.value() == authority as usize)
980            .unwrap_or_else(|| {
981                panic!("Authority {authority} (round {round}) not found in {ancestors:?}")
982            });
983        let b = ancestors.remove(i);
984        ancestors.insert(0, b);
985        // Create test block.
986        let block = TestBlock::new(round, authority)
987            .set_ancestors(ancestors)
988            .set_transactions(vec![crate::Transaction::new(vec![1; 16]); num_transactions])
989            .set_transaction_votes(reject_votes)
990            .build();
991        VerifiedBlock::new_for_test(block)
992    }
993
994    #[tokio::test]
995    async fn test_direct_finalize_no_reject_votes() {
996        let mut fixture = create_commit_finalizer_fixture();
997
998        // Create round 1-4 blocks with 10 transactions each. Add these blocks to transaction certifier.
999        let mut dag_builder = DagBuilder::new(fixture.context.clone());
1000        dag_builder.layers(1..=4).num_transactions(10).build();
1001        let blocks = dag_builder.all_blocks();
1002        fixture.add_blocks(blocks.clone());
1003
1004        // Select a round 2 block as the leader and create CommittedSubDag.
1005        let leader = blocks.iter().find(|b| b.round() == 2).unwrap();
1006        let committed_sub_dags = fixture.linearizer.handle_commit(vec![leader.clone()]);
1007        assert_eq!(committed_sub_dags.len(), 1);
1008        let committed_sub_dag = &committed_sub_dags[0];
1009
1010        // This committed sub-dag can be directly finalized.
1011        let finalized_commits = fixture
1012            .commit_finalizer
1013            .process_commit(committed_sub_dag.clone())
1014            .await;
1015        assert_eq!(finalized_commits.len(), 1);
1016        let finalized_commit = &finalized_commits[0];
1017        assert_eq!(committed_sub_dag, finalized_commit);
1018
1019        // CommitFinalizer should be empty.
1020        assert!(fixture.commit_finalizer.is_empty());
1021    }
1022
1023    // Commits can be directly finalized if when they are added to commit finalizer,
1024    // the rejected votes reach quorum if they exist on any transaction.
1025    #[tokio::test]
1026    async fn test_direct_finalize_with_reject_votes() {
1027        let mut fixture = create_commit_finalizer_fixture();
1028
1029        // Create round 1 blocks with 10 transactions each.
1030        let mut dag_builder = DagBuilder::new(fixture.context.clone());
1031        dag_builder.layer(1).num_transactions(10).build();
1032
1033        let round_1_blocks = dag_builder.all_blocks();
1034        fixture.add_blocks_with_own_votes(
1035            round_1_blocks
1036                .iter()
1037                .map(|b| {
1038                    if b.author().value() != 3 {
1039                        (b.clone(), vec![])
1040                    } else {
1041                        (b.clone(), vec![0, 3])
1042                    }
1043                })
1044                .collect(),
1045        );
1046
1047        // Select the block with rejected transaction.
1048        let block_with_rejected_txn = round_1_blocks[3].clone();
1049        let reject_vote = BlockTransactionVotes {
1050            block_ref: block_with_rejected_txn.reference(),
1051            rejects: vec![0, 3],
1052        };
1053
1054        // Create round 2 blocks without authority 3's block from round 1.
1055        let ancestors: Vec<BlockRef> = round_1_blocks[0..3].iter().map(|b| b.reference()).collect();
1056        // Leader links to block_with_rejected_txn, but other blocks do not.
1057        let round_2_blocks = vec![
1058            create_block(
1059                2,
1060                0,
1061                round_1_blocks.iter().map(|b| b.reference()).collect(),
1062                10,
1063                vec![reject_vote.clone()],
1064            ),
1065            create_block(2, 1, ancestors.clone(), 10, vec![]),
1066            create_block(2, 2, ancestors.clone(), 10, vec![]),
1067        ];
1068        fixture.add_blocks(round_2_blocks.clone());
1069
1070        // Select round 2 authority 0 block as the leader and create CommittedSubDag.
1071        let leader = round_2_blocks[0].clone();
1072        let committed_sub_dags = fixture.linearizer.handle_commit(vec![leader.clone()]);
1073        assert_eq!(committed_sub_dags.len(), 1);
1074        let committed_sub_dag = &committed_sub_dags[0];
1075        assert_eq!(committed_sub_dag.blocks.len(), 5);
1076
1077        // Create round 3 blocks voting on the leader.
1078        let ancestors: Vec<BlockRef> = round_2_blocks.iter().map(|b| b.reference()).collect();
1079        let round_3_blocks = vec![
1080            create_block(3, 0, ancestors.clone(), 0, vec![]),
1081            create_block(3, 1, ancestors.clone(), 0, vec![reject_vote.clone()]),
1082            create_block(3, 2, ancestors.clone(), 0, vec![reject_vote.clone()]),
1083            create_block(
1084                3,
1085                3,
1086                std::iter::once(round_1_blocks[3].reference())
1087                    .chain(ancestors.clone())
1088                    .collect(),
1089                0,
1090                vec![reject_vote.clone()],
1091            ),
1092        ];
1093        fixture.add_blocks(round_3_blocks.clone());
1094
1095        // Create round 4 blocks certifying the leader.
1096        let ancestors: Vec<BlockRef> = round_3_blocks.iter().map(|b| b.reference()).collect();
1097        let round_4_blocks = vec![
1098            create_block(4, 0, ancestors.clone(), 0, vec![]),
1099            create_block(4, 1, ancestors.clone(), 0, vec![]),
1100            create_block(4, 2, ancestors.clone(), 0, vec![]),
1101            create_block(4, 3, ancestors.clone(), 0, vec![]),
1102        ];
1103        fixture.add_blocks(round_4_blocks.clone());
1104
1105        // This committed sub-dag can be directly finalized because the rejected transactions
1106        // have a quorum of votes.
1107        let finalized_commits = fixture
1108            .commit_finalizer
1109            .process_commit(committed_sub_dag.clone())
1110            .await;
1111        assert_eq!(finalized_commits.len(), 1);
1112        let finalized_commit = &finalized_commits[0];
1113        assert_eq!(committed_sub_dag.commit_ref, finalized_commit.commit_ref);
1114        assert_eq!(committed_sub_dag.blocks, finalized_commit.blocks);
1115        assert_eq!(finalized_commit.rejected_transactions_by_block.len(), 1);
1116        assert_eq!(
1117            finalized_commit
1118                .rejected_transactions_by_block
1119                .get(&block_with_rejected_txn.reference())
1120                .unwrap()
1121                .clone(),
1122            vec![0, 3],
1123        );
1124
1125        // CommitFinalizer should be empty.
1126        assert!(fixture.commit_finalizer.is_empty());
1127    }
1128
1129    // Test indirect finalization when:
1130    // 1. Reject votes on transaction does not reach quorum initially, but reach quorum later.
1131    // 2. Transaction is indirectly rejected.
1132    // 3. Transaction is indirectly finalized.
1133    #[tokio::test]
1134    async fn test_indirect_finalize_with_reject_votes() {
1135        let mut fixture = create_commit_finalizer_fixture();
1136
1137        // Create round 1 blocks with 10 transactions each.
1138        let mut dag_builder = DagBuilder::new(fixture.context.clone());
1139        dag_builder.layer(1).num_transactions(10).build();
1140
1141        let round_1_blocks = dag_builder.all_blocks();
1142        fixture.add_blocks_with_own_votes(
1143            round_1_blocks
1144                .iter()
1145                .map(|b| {
1146                    if b.author().value() != 3 {
1147                        (b.clone(), vec![])
1148                    } else {
1149                        (b.clone(), vec![0, 3])
1150                    }
1151                })
1152                .collect(),
1153        );
1154
1155        // Select the block with rejected transaction.
1156        let block_with_rejected_txn = round_1_blocks[3].clone();
1157        // How transactions in this block will be voted:
1158        // Txn 1 (quorum reject): 1 reject vote at round 2, 1 reject vote at round 3, and 1 at round 4.
1159        // Txn 4 (indirect reject): 1 reject vote at round 3, and 1 at round 4.
1160        // Txn 7 (indirect finalize): 1 reject vote at round 3.
1161
1162        // Create round 2 blocks without authority 3.
1163        let ancestors: Vec<BlockRef> = round_1_blocks[0..3].iter().map(|b| b.reference()).collect();
1164        // Leader links to block_with_rejected_txn, but other blocks do not.
1165        let round_2_blocks = vec![
1166            create_block(
1167                2,
1168                0,
1169                round_1_blocks.iter().map(|b| b.reference()).collect(),
1170                10,
1171                vec![BlockTransactionVotes {
1172                    block_ref: block_with_rejected_txn.reference(),
1173                    rejects: vec![1, 4],
1174                }],
1175            ),
1176            // Use ancestors without authority 3 to avoid voting on its transactions.
1177            create_block(2, 1, ancestors.clone(), 10, vec![]),
1178            create_block(2, 2, ancestors.clone(), 10, vec![]),
1179        ];
1180        fixture.add_blocks(round_2_blocks.clone());
1181
1182        // Select round 2 authority 0 block as the a leader.
1183        let mut leaders = vec![round_2_blocks[0].clone()];
1184
1185        // Create round 3 blocks voting on the leader and casting reject votes.
1186        let ancestors: Vec<BlockRef> = round_2_blocks.iter().map(|b| b.reference()).collect();
1187        let round_3_blocks = vec![
1188            create_block(3, 0, ancestors.clone(), 0, vec![]),
1189            create_block(
1190                3,
1191                1,
1192                ancestors.clone(),
1193                0,
1194                vec![BlockTransactionVotes {
1195                    block_ref: block_with_rejected_txn.reference(),
1196                    rejects: vec![1, 4, 7],
1197                }],
1198            ),
1199            create_block(
1200                3,
1201                3,
1202                std::iter::once(round_1_blocks[3].reference())
1203                    .chain(ancestors.clone())
1204                    .collect(),
1205                0,
1206                vec![],
1207            ),
1208        ];
1209        fixture.add_blocks(round_3_blocks.clone());
1210        leaders.push(round_3_blocks[2].clone());
1211
1212        // Create round 4 blocks certifying the leader and casting reject votes.
1213        let ancestors: Vec<BlockRef> = round_3_blocks.iter().map(|b| b.reference()).collect();
1214        let round_4_blocks = vec![
1215            create_block(4, 0, ancestors.clone(), 0, vec![]),
1216            create_block(4, 1, ancestors.clone(), 0, vec![]),
1217            create_block(
1218                4,
1219                2,
1220                std::iter::once(round_2_blocks[2].reference())
1221                    .chain(ancestors.clone())
1222                    .collect(),
1223                0,
1224                vec![BlockTransactionVotes {
1225                    block_ref: block_with_rejected_txn.reference(),
1226                    rejects: vec![1],
1227                }],
1228            ),
1229            create_block(4, 3, ancestors.clone(), 0, vec![]),
1230        ];
1231        fixture.add_blocks(round_4_blocks.clone());
1232        leaders.push(round_4_blocks[1].clone());
1233
1234        // Create round 5-7 blocks without casting reject votes.
1235        // Select the last leader from round 5. It is necessary to have round 5 leader to indirectly finalize
1236        // transactions committed by round 2 leader.
1237        let mut last_round_blocks = round_4_blocks.clone();
1238        for r in 5..=7 {
1239            let ancestors: Vec<BlockRef> =
1240                last_round_blocks.iter().map(|b| b.reference()).collect();
1241            let round_blocks: Vec<_> = (0..4)
1242                .map(|i| create_block(r, i, ancestors.clone(), 0, vec![]))
1243                .collect();
1244            fixture.add_blocks(round_blocks.clone());
1245            if r == 5 {
1246                leaders.push(round_blocks[0].clone());
1247            }
1248            last_round_blocks = round_blocks;
1249        }
1250
1251        // Create CommittedSubDag from leaders.
1252        assert_eq!(leaders.len(), 4);
1253        let committed_sub_dags = fixture.linearizer.handle_commit(leaders);
1254        assert_eq!(committed_sub_dags.len(), 4);
1255
1256        // Buffering the initial 3 commits should not finalize.
1257        for commit in committed_sub_dags.iter().take(3) {
1258            let finalized_commits = fixture
1259                .commit_finalizer
1260                .process_commit(commit.clone())
1261                .await;
1262            assert_eq!(finalized_commits.len(), 0);
1263        }
1264
1265        // Buffering the 4th commit should finalize all commits.
1266        let finalized_commits = fixture
1267            .commit_finalizer
1268            .process_commit(committed_sub_dags[3].clone())
1269            .await;
1270        assert_eq!(finalized_commits.len(), 4);
1271
1272        // Check rejected transactions.
1273        let rejected_transactions = finalized_commits[0].rejected_transactions_by_block.clone();
1274        assert_eq!(rejected_transactions.len(), 1);
1275        assert_eq!(
1276            rejected_transactions
1277                .get(&block_with_rejected_txn.reference())
1278                .unwrap(),
1279            &vec![1, 4]
1280        );
1281
1282        // Other commits should have no rejected transactions.
1283        for commit in finalized_commits.iter().skip(1) {
1284            assert!(commit.rejected_transactions_by_block.is_empty());
1285        }
1286
1287        // CommitFinalizer should be empty.
1288        assert!(fixture.commit_finalizer.is_empty());
1289    }
1290
1291    // Test direct finalization when a block is at or below GC round from the block's own leader.
1292    #[tokio::test]
1293    async fn test_direct_finalize_with_gc() {
1294        let mut fixture = create_commit_finalizer_fixture();
1295        assert_eq!(fixture.context.protocol_config.consensus_gc_depth(), 5);
1296
1297        // Create round 1 blocks with 10 transactions each.
1298        let mut dag_builder = DagBuilder::new(fixture.context.clone());
1299        dag_builder.layer(1).num_transactions(10).build();
1300        let round_1_blocks = dag_builder.all_blocks();
1301        fixture.add_blocks(round_1_blocks.clone());
1302
1303        // Select B1(3) to be rejected due to GC.
1304        let block_rejected = round_1_blocks[3].clone();
1305
1306        // Create round 2-5 blocks without creating or linking to an authority 4 block.
1307        // The goal is to GC B1(3).
1308        let mut last_round_blocks: Vec<VerifiedBlock> = round_1_blocks
1309            .iter()
1310            .enumerate()
1311            .filter_map(|(i, b)| {
1312                if i != block_rejected.author().value() {
1313                    Some(b.clone())
1314                } else {
1315                    None
1316                }
1317            })
1318            .collect();
1319        for r in 2..=5 {
1320            let ancestors: Vec<BlockRef> =
1321                last_round_blocks.iter().map(|b| b.reference()).collect();
1322            last_round_blocks = [0, 1, 2]
1323                .map(|i| create_block(r, i, ancestors.clone(), 0, vec![]))
1324                .to_vec();
1325            fixture.add_blocks(last_round_blocks.clone());
1326        }
1327
1328        // Create round 6-9 blocks without authority 3 blocks.
1329        // And add a leader from authority 0 of each round. Only authority 0 blocks can link to B1(3).
1330        let mut leaders = vec![];
1331        for r in 6..=9 {
1332            let ancestors: Vec<BlockRef> =
1333                last_round_blocks.iter().map(|b| b.reference()).collect();
1334            last_round_blocks = [0, 1, 2]
1335                .map(|i| {
1336                    let mut ancestors = ancestors.clone();
1337                    if i == 0 {
1338                        // Link to the GC'ed block B1(3).
1339                        ancestors.push(block_rejected.reference());
1340                    }
1341                    create_block(r, i, ancestors, 0, vec![])
1342                })
1343                .to_vec();
1344            leaders.push(last_round_blocks[0].clone());
1345            fixture.add_blocks(last_round_blocks.clone());
1346        }
1347
1348        // Create CommittedSubDag from leaders.
1349        assert_eq!(leaders.len(), 4);
1350        let committed_sub_dags = fixture.linearizer.handle_commit(leaders);
1351        assert_eq!(committed_sub_dags.len(), 4);
1352
1353        // Ensure B1(3) is included in commit 0.
1354        assert!(committed_sub_dags[0].blocks.contains(&block_rejected));
1355
1356        // Buffering the initial 3 commits should not finalize.
1357        for commit in committed_sub_dags.iter().take(3) {
1358            assert!(commit.decided_with_local_blocks);
1359            let finalized_commits = fixture
1360                .commit_finalizer
1361                .process_commit(commit.clone())
1362                .await;
1363            assert_eq!(finalized_commits.len(), 0);
1364        }
1365
1366        // Buffering the 4th commit should finalize all commits.
1367        let finalized_commits = fixture
1368            .commit_finalizer
1369            .process_commit(committed_sub_dags[3].clone())
1370            .await;
1371        assert_eq!(finalized_commits.len(), 4);
1372
1373        // Check rejected transactions.
1374        // B1(3) all transactions get rejected as it is effectively voted only by one block from authority 3 (the block it self) and authority 0. Due to vote compression only the first block of authority 0 is counted.
1375        // The block is out of GC bound for the B7(1) and B7(2) blocks which are committed by the leader of round 8. Thus no accept votes are counted from authorities 1 & 2.
1376        let rejected_transactions = finalized_commits[0].rejected_transactions_by_block.clone();
1377        assert_eq!(rejected_transactions.len(), 1);
1378        assert_eq!(
1379            rejected_transactions
1380                .get(&block_rejected.reference())
1381                .unwrap(),
1382            &vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
1383        );
1384
1385        // Other commits should have no rejected transactions.
1386        for commit in finalized_commits.iter().skip(1) {
1387            assert!(commit.rejected_transactions_by_block.is_empty());
1388        }
1389
1390        // CommitFinalizer should be empty.
1391        assert!(fixture.commit_finalizer.is_empty());
1392    }
1393
1394    // Test indirect finalization when transaction is rejected due to GC.
1395    #[tokio::test]
1396    async fn test_indirect_reject_with_gc() {
1397        let mut fixture = create_commit_finalizer_fixture();
1398        assert_eq!(fixture.context.protocol_config.consensus_gc_depth(), 5);
1399
1400        // Create round 1 blocks with 10 transactions each.
1401        let mut dag_builder = DagBuilder::new(fixture.context.clone());
1402        dag_builder.layer(1).num_transactions(10).build();
1403
1404        let round_1_blocks = dag_builder.all_blocks();
1405        fixture.add_blocks(round_1_blocks.clone());
1406
1407        // Select B1(3) to have a rejected transaction.
1408        let block_with_rejected_txn = round_1_blocks[3].clone();
1409        // How transactions in this block will be voted:
1410        // Txn 1 (GC reject): 1 reject vote at round 2. But the txn will get rejected because there are only
1411        // 2 accept votes.
1412
1413        // Create round 2 blocks, with B2(1) rejecting transaction 1 from B1(3).
1414        // Note that 3 blocks link to B1(3) without rejecting transaction 1.
1415        let ancestors: Vec<BlockRef> = round_1_blocks.iter().map(|b| b.reference()).collect();
1416        let round_2_blocks = vec![
1417            create_block(2, 0, ancestors.clone(), 0, vec![]),
1418            create_block(
1419                2,
1420                1,
1421                ancestors.clone(),
1422                0,
1423                vec![BlockTransactionVotes {
1424                    block_ref: block_with_rejected_txn.reference(),
1425                    rejects: vec![1],
1426                }],
1427            ),
1428            create_block(2, 2, ancestors.clone(), 0, vec![]),
1429            create_block(2, 3, ancestors.clone(), 0, vec![]),
1430        ];
1431        fixture.add_blocks(round_2_blocks.clone());
1432
1433        // Create round 3-6 blocks without creating or linking to an authority 2 block.
1434        // The goal is to GC B2(2).
1435        let mut last_round_blocks: Vec<VerifiedBlock> = round_2_blocks
1436            .iter()
1437            .enumerate()
1438            .filter_map(|(i, b)| if i != 2 { Some(b.clone()) } else { None })
1439            .collect();
1440        for r in 3..=6 {
1441            let ancestors: Vec<BlockRef> =
1442                last_round_blocks.iter().map(|b| b.reference()).collect();
1443            last_round_blocks = [0, 1, 3]
1444                .map(|i| create_block(r, i, ancestors.clone(), 0, vec![]))
1445                .to_vec();
1446            fixture.add_blocks(last_round_blocks.clone());
1447        }
1448
1449        // Create round 7-10 blocks and add a leader from authority 0 of each round.
1450        let mut leaders = vec![];
1451        for r in 7..=10 {
1452            let ancestors: Vec<BlockRef> =
1453                last_round_blocks.iter().map(|b| b.reference()).collect();
1454            last_round_blocks = (0..4)
1455                .map(|i| {
1456                    let mut ancestors = ancestors.clone();
1457                    if r == 7 && i == 2 {
1458                        // Link to the GC'ed block B2(2).
1459                        ancestors.push(round_2_blocks[2].reference());
1460                    }
1461                    create_block(r, i, ancestors, 0, vec![])
1462                })
1463                .collect();
1464            leaders.push(last_round_blocks[0].clone());
1465            fixture.add_blocks(last_round_blocks.clone());
1466        }
1467
1468        // Create CommittedSubDag from leaders.
1469        assert_eq!(leaders.len(), 4);
1470        let committed_sub_dags = fixture.linearizer.handle_commit(leaders);
1471        assert_eq!(committed_sub_dags.len(), 4);
1472
1473        // Ensure 1 reject vote is contained in B2(1) in commit 0.
1474        assert!(committed_sub_dags[0].blocks.contains(&round_2_blocks[1]));
1475        // Ensure B2(2) is GC'ed.
1476        for commit in committed_sub_dags.iter() {
1477            assert!(!commit.blocks.contains(&round_2_blocks[2]));
1478        }
1479
1480        // Buffering the initial 3 commits should not finalize.
1481        for commit in committed_sub_dags.iter().take(3) {
1482            assert!(commit.decided_with_local_blocks);
1483            let finalized_commits = fixture
1484                .commit_finalizer
1485                .process_commit(commit.clone())
1486                .await;
1487            assert_eq!(finalized_commits.len(), 0);
1488        }
1489
1490        // Buffering the 4th commit should finalize all commits.
1491        let finalized_commits = fixture
1492            .commit_finalizer
1493            .process_commit(committed_sub_dags[3].clone())
1494            .await;
1495        assert_eq!(finalized_commits.len(), 4);
1496
1497        // Check rejected transactions.
1498        // B1(3) txn 1 gets rejected, even though there are has 3 blocks links to B1(3) without rejecting txn 1.
1499        // This is because there are only 2 accept votes for this transaction, which is less than the quorum threshold.
1500        let rejected_transactions = finalized_commits[0].rejected_transactions_by_block.clone();
1501        assert_eq!(rejected_transactions.len(), 1);
1502        assert_eq!(
1503            rejected_transactions
1504                .get(&block_with_rejected_txn.reference())
1505                .unwrap(),
1506            &vec![1]
1507        );
1508
1509        // Other commits should have no rejected transactions.
1510        for commit in finalized_commits.iter().skip(1) {
1511            assert!(commit.rejected_transactions_by_block.is_empty());
1512        }
1513
1514        // CommitFinalizer should be empty.
1515        assert!(fixture.commit_finalizer.is_empty());
1516    }
1517
1518    #[tokio::test]
1519    async fn test_finalize_remote_commits_with_reject_votes() {
1520        let mut fixture: CommitTestFixture = create_commit_finalizer_fixture();
1521        let mut all_blocks = vec![];
1522
1523        // Create round 1 blocks with 10 transactions each.
1524        let mut dag_builder = DagBuilder::new(fixture.context.clone());
1525        dag_builder.layer(1).num_transactions(10).build();
1526        let round_1_blocks = dag_builder.all_blocks();
1527        all_blocks.push(round_1_blocks.clone());
1528
1529        // Collect leaders from round 1.
1530        let mut leaders = vec![round_1_blocks[0].clone()];
1531
1532        // Create round 2-9 blocks and set leaders until round 7.
1533        let mut last_round_blocks = round_1_blocks.clone();
1534        for r in 2..=9 {
1535            let ancestors: Vec<BlockRef> =
1536                last_round_blocks.iter().map(|b| b.reference()).collect();
1537            let round_blocks: Vec<_> = (0..4)
1538                .map(|i| create_block(r, i, ancestors.clone(), 0, vec![]))
1539                .collect();
1540            all_blocks.push(round_blocks.clone());
1541            if r <= 7 && r != 5 {
1542                leaders.push(round_blocks[r as usize % 4].clone());
1543            }
1544            last_round_blocks = round_blocks;
1545        }
1546
1547        // Leader rounds: 1, 2, 3, 4, 6, 7.
1548        assert_eq!(leaders.len(), 6);
1549
1550        async fn add_blocks_and_process_commit(
1551            fixture: &mut CommitTestFixture,
1552            leaders: &[VerifiedBlock],
1553            all_blocks: &[Vec<VerifiedBlock>],
1554            index: usize,
1555            local: bool,
1556        ) -> Vec<CommittedSubDag> {
1557            let leader = leaders[index].clone();
1558            // Add blocks related to the commit to DagState and TransactionCertifier.
1559            if local {
1560                for round_blocks in all_blocks.iter().take(leader.round() as usize + 2) {
1561                    fixture.add_blocks(round_blocks.clone());
1562                }
1563            } else {
1564                for round_blocks in all_blocks.iter().take(leader.round() as usize) {
1565                    fixture.add_blocks(round_blocks.clone());
1566                }
1567            };
1568            // Generate remote commit from leader.
1569            let mut committed_sub_dags = fixture.linearizer.handle_commit(vec![leader]);
1570            assert_eq!(committed_sub_dags.len(), 1);
1571            let mut remote_commit = committed_sub_dags.pop().unwrap();
1572            remote_commit.decided_with_local_blocks = local;
1573            // Process the remote commit.
1574            fixture
1575                .commit_finalizer
1576                .process_commit(remote_commit.clone())
1577                .await
1578        }
1579
1580        // Add commit 1-3 as remote commits. There should be no finalized commits.
1581        for i in 0..3 {
1582            let finalized_commits =
1583                add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, i, false).await;
1584            assert!(finalized_commits.is_empty());
1585        }
1586
1587        // Buffer round 4 commit as a remote commit. This should finalize the 1st commit at round 1.
1588        let finalized_commits =
1589            add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, 3, false).await;
1590        assert_eq!(finalized_commits.len(), 1);
1591        assert_eq!(finalized_commits[0].commit_ref.index, 1);
1592        assert_eq!(finalized_commits[0].leader.round, 1);
1593
1594        // Buffer round 6 (5th) commit as local commit. This should help finalize the commits at round 2 and 3.
1595        let finalized_commits =
1596            add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, 4, true).await;
1597        assert_eq!(finalized_commits.len(), 2);
1598        assert_eq!(finalized_commits[0].commit_ref.index, 2);
1599        assert_eq!(finalized_commits[0].leader.round, 2);
1600        assert_eq!(finalized_commits[1].commit_ref.index, 3);
1601        assert_eq!(finalized_commits[1].leader.round, 3);
1602
1603        // Buffer round 7 (6th) commit as local commit. This should help finalize the commits at round 4, 6 and 7 (itself).
1604        let finalized_commits =
1605            add_blocks_and_process_commit(&mut fixture, &leaders, &all_blocks, 5, true).await;
1606        assert_eq!(finalized_commits.len(), 3);
1607        assert_eq!(finalized_commits[0].commit_ref.index, 4);
1608        assert_eq!(finalized_commits[0].leader.round, 4);
1609        assert_eq!(finalized_commits[1].commit_ref.index, 5);
1610        assert_eq!(finalized_commits[1].leader.round, 6);
1611        assert_eq!(finalized_commits[2].commit_ref.index, 6);
1612        assert_eq!(finalized_commits[2].leader.round, 7);
1613
1614        // CommitFinalizer should be empty.
1615        assert!(fixture.commit_finalizer.is_empty());
1616    }
1617}