consensus_core/
transaction_certifier.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::BTreeMap, sync::Arc, time::Duration};
5
6use consensus_config::Stake;
7use consensus_types::block::{BlockRef, Round, TransactionIndex};
8use mysten_common::ZipDebugEqIteratorExt;
9use parking_lot::RwLock;
10use tracing::{debug, info};
11
12use crate::{
13    BlockAPI as _, CertifiedBlock, VerifiedBlock,
14    block::{BlockTransactionVotes, GENESIS_ROUND},
15    block_verifier::BlockVerifier,
16    context::Context,
17    dag_state::DagState,
18    stake_aggregator::{QuorumThreshold, StakeAggregator},
19};
20
21/// TransactionCertifier has the following purposes:
22/// 1. Certifies transactions and sends them to execute on the fastpath.
23/// 2. Keeps track of own votes on transactions, and allows the votes to be retrieved
24///    later in core after acceptance of the blocks containing the transactions.
25/// 3. Aggregates reject votes on transactions, and allows the aggregated votes
26///    to be retrieved during post-commit finalization.
27///
28/// A transaction is certified if a quorum of authorities in the causal history of a proposed block
29/// vote to accept the transaction. Accept votes are implicit in blocks: if a transaction is in
30/// the causal history of a block and the block does not vote to reject it, the block
31/// is considered to vote to accept the transaction. Transaction finalization are eventually resolved
32/// post commit, by checking if there is a certification of the transaction in the causal history
33/// of the leader. So only accept votes are only considered if they are in the causal history of own
34/// proposed blocks.
35///
36/// A transaction is rejected if a quorum of authorities vote to reject it. When this happens, it is
37/// guaranteed that no validator can observe a certification of the transaction, with <= f malicious
38/// stake.
39///
40/// A block is certified if every transaction in the block is either certified or rejected.
41/// TransactionCertifier outputs certified blocks.
42///
43/// The invariant between TransactionCertifier and post-commit finalization is that if a quorum of
44/// authorities certified a transaction for fastpath and executed it, then the transaction
45/// must also be finalized post consensus commit. The reverse is not true though, because
46/// fastpath execution is only a latency optimization, and not required for correctness.
47#[derive(Clone)]
48pub struct TransactionCertifier {
49    // The state of blocks being voted on and certified.
50    certifier_state: Arc<RwLock<CertifierState>>,
51    // Verify transactions during recovery.
52    block_verifier: Arc<dyn BlockVerifier>,
53    // The state of the DAG.
54    dag_state: Arc<RwLock<DagState>>,
55}
56
57impl TransactionCertifier {
58    pub fn new(
59        context: Arc<Context>,
60        block_verifier: Arc<dyn BlockVerifier>,
61        dag_state: Arc<RwLock<DagState>>,
62    ) -> Self {
63        Self {
64            certifier_state: Arc::new(RwLock::new(CertifierState::new(context))),
65            block_verifier,
66            dag_state,
67        }
68    }
69
70    /// Recovers all blocks from DB after the given round.
71    ///
72    /// This is useful for initializing the certifier state
73    /// for future commits and block proposals.
74    pub(crate) fn recover_blocks_after_round(&self, after_round: Round) {
75        let context = self.certifier_state.read().context.clone();
76        if !context.protocol_config.transaction_voting_enabled() {
77            info!("Skipping certifier recovery in non-mysticeti fast path mode");
78            return;
79        }
80
81        let store = self.dag_state.read().store().clone();
82
83        let recovery_start_round = after_round + 1;
84        info!(
85            "Recovering certifier state from round {}",
86            recovery_start_round,
87        );
88
89        let authorities = context
90            .committee
91            .authorities()
92            .map(|(index, _)| index)
93            .collect::<Vec<_>>();
94        for authority_index in authorities {
95            let blocks = store
96                .scan_blocks_by_author(authority_index, recovery_start_round)
97                .unwrap();
98            info!(
99                "Recovered and voting on {} blocks from authority {} {}",
100                blocks.len(),
101                authority_index,
102                context.committee.authority(authority_index).hostname
103            );
104            self.recover_and_vote_on_blocks(blocks);
105        }
106    }
107
108    /// Recovers and potentially votes on the given blocks.
109    ///
110    /// Because own votes on blocks are not stored, during recovery it is necessary to vote on
111    /// input blocks that are above GC round and have not been included before, which can be
112    /// included in a future proposed block.
113    ///
114    /// In addition, add_voted_blocks() will eventually process reject votes contained in the input blocks.
115    pub(crate) fn recover_and_vote_on_blocks(&self, blocks: Vec<VerifiedBlock>) {
116        let context = self.certifier_state.read().context.clone();
117        let should_vote_blocks = {
118            let dag_state = self.dag_state.read();
119            let gc_round = dag_state.gc_round();
120            blocks
121                .iter()
122                // Must make sure the block is above GC round before calling has_been_included().
123                .map(|b| b.round() > gc_round && !dag_state.has_been_included(&b.reference()))
124                .collect::<Vec<_>>()
125        };
126        let voted_blocks = blocks
127            .into_iter()
128            .zip_debug_eq(should_vote_blocks)
129            .map(|(b, should_vote)| {
130                if !should_vote {
131                    // Voting is unnecessary for blocks already included in own proposed blocks,
132                    // or outside of local DAG GC bound.
133                    (b, vec![])
134                } else {
135                    // Voting is needed for blocks above GC round and not yet included in own proposed blocks.
136                    // A block proposal can include the input block later and retries own votes on it.
137                    let reject_transaction_votes =
138                        self.block_verifier.vote(&b).unwrap_or_else(|e| {
139                            panic!(
140                                "Failed to vote on block {} (own_index={}) during recovery: {}",
141                                b.reference(),
142                                context.own_index,
143                                e
144                            )
145                        });
146                    (b, reject_transaction_votes)
147                }
148            })
149            .collect::<Vec<_>>();
150        self.certifier_state.write().add_voted_blocks(voted_blocks);
151        // Do not send certified blocks to the fastpath output channel during recovery,
152        // because these transactions could have been executed and fastpath latency optimization is
153        // unnecessary for recovered transactions.
154    }
155
156    /// Stores own reject votes on input blocks, and aggregates reject votes from the input blocks.
157    pub fn add_voted_blocks(&self, voted_blocks: Vec<(VerifiedBlock, Vec<TransactionIndex>)>) {
158        self.certifier_state.write().add_voted_blocks(voted_blocks);
159    }
160
161    /// Aggregates accept votes from the own proposed block.
162    pub(crate) fn add_proposed_block(&self, proposed_block: VerifiedBlock) {
163        self.certifier_state
164            .write()
165            .add_proposed_block(proposed_block);
166    }
167
168    /// Retrieves own votes on peer block transactions.
169    pub(crate) fn get_own_votes(&self, block_refs: Vec<BlockRef>) -> Vec<BlockTransactionVotes> {
170        let mut votes = vec![];
171        let certifier_state = self.certifier_state.read();
172        for block_ref in block_refs {
173            if block_ref.round <= certifier_state.gc_round {
174                continue;
175            }
176            let vote_info = certifier_state.votes.get(&block_ref).unwrap_or_else(|| {
177                panic!("Ancestor block {} not found in certifier state", block_ref)
178            });
179            if !vote_info.own_reject_txn_votes.is_empty() {
180                votes.push(BlockTransactionVotes {
181                    block_ref,
182                    rejects: vote_info.own_reject_txn_votes.clone(),
183                });
184            }
185        }
186        votes
187    }
188
189    /// Retrieves transactions in the block that have received reject votes, and the total stake of the votes.
190    /// TransactionIndex not included in the output has no reject votes.
191    /// Returns None if no information is found for the block.
192    pub(crate) fn get_reject_votes(
193        &self,
194        block_ref: &BlockRef,
195    ) -> Option<Vec<(TransactionIndex, Stake)>> {
196        let accumulated_reject_votes = self
197            .certifier_state
198            .read()
199            .votes
200            .get(block_ref)?
201            .reject_txn_votes
202            .iter()
203            .map(|(idx, stake_agg)| (*idx, stake_agg.stake()))
204            .collect::<Vec<_>>();
205        Some(accumulated_reject_votes)
206    }
207
208    /// Runs garbage collection on the internal state by removing data for blocks <= gc_round,
209    /// and updates the GC round for the certifier.
210    ///
211    /// IMPORTANT: the gc_round used here can trail the latest gc_round from DagState.
212    /// This is because the gc round here is determined by CommitFinalizer, which needs to process
213    /// commits before the latest commit in DagState. Reject votes received by transactions below
214    /// local DAG gc_round may still need to be accessed from CommitFinalizer.
215    pub(crate) fn run_gc(&self, gc_round: Round) {
216        let dag_state_gc_round = self.dag_state.read().gc_round();
217        assert!(
218            gc_round <= dag_state_gc_round,
219            "TransactionCertifier cannot GC higher than DagState GC round ({} > {})",
220            gc_round,
221            dag_state_gc_round
222        );
223        self.certifier_state.write().update_gc_round(gc_round);
224    }
225}
226
227/// CertifierState keeps track of votes received by each transaction and block,
228/// and helps determine if votes reach a quorum. Reject votes can start accumulating
229/// even before the target block is received by this authority.
230struct CertifierState {
231    context: Arc<Context>,
232
233    // Maps received blocks' refs to votes on those blocks from other blocks.
234    // Even if a block has no reject votes on its transactions, it still has an entry here.
235    votes: BTreeMap<BlockRef, VoteInfo>,
236
237    // Highest round where blocks are GC'ed.
238    gc_round: Round,
239}
240
241impl CertifierState {
242    fn new(context: Arc<Context>) -> Self {
243        Self {
244            context,
245            votes: BTreeMap::new(),
246            gc_round: GENESIS_ROUND,
247        }
248    }
249
250    fn add_voted_blocks(
251        &mut self,
252        voted_blocks: Vec<(VerifiedBlock, Vec<TransactionIndex>)>,
253    ) -> Vec<CertifiedBlock> {
254        let mut certified_blocks = vec![];
255        for (voted_block, reject_txn_votes) in voted_blocks {
256            let blocks = self.add_voted_block(voted_block, reject_txn_votes);
257            certified_blocks.extend(blocks);
258        }
259
260        if !certified_blocks.is_empty() {
261            self.context
262                .metrics
263                .node_metrics
264                .certifier_output_blocks
265                .with_label_values(&["voted"])
266                .inc_by(certified_blocks.len() as u64);
267        }
268
269        certified_blocks
270    }
271
272    fn add_voted_block(
273        &mut self,
274        voted_block: VerifiedBlock,
275        reject_txn_votes: Vec<TransactionIndex>,
276    ) -> Vec<CertifiedBlock> {
277        if voted_block.round() <= self.gc_round {
278            // Ignore the block and own votes, since they are outside of certifier GC bound.
279            return vec![];
280        }
281
282        // Count own reject votes against each peer authority.
283        let peer_hostname = &self
284            .context
285            .committee
286            .authority(voted_block.author())
287            .hostname;
288        self.context
289            .metrics
290            .node_metrics
291            .certifier_own_reject_votes
292            .with_label_values(&[peer_hostname])
293            .inc_by(reject_txn_votes.len() as u64);
294
295        // Initialize the entry for the voted block.
296        let vote_info = self.votes.entry(voted_block.reference()).or_default();
297        if vote_info.block.is_some() {
298            // Input block has already been processed and added to the state.
299            return vec![];
300        }
301        vote_info.block = Some(voted_block.clone());
302        vote_info.own_reject_txn_votes = reject_txn_votes;
303
304        let mut certified_blocks = vec![];
305
306        let now = self.context.clock.timestamp_utc_ms();
307
308        // Update reject votes from the input block.
309        for block_votes in voted_block.transaction_votes() {
310            if block_votes.block_ref.round <= self.gc_round {
311                // Block is outside of GC bound.
312                continue;
313            }
314            let vote_info = self.votes.entry(block_votes.block_ref).or_default();
315            for reject in &block_votes.rejects {
316                vote_info
317                    .reject_txn_votes
318                    .entry(*reject)
319                    .or_default()
320                    .add_unique(voted_block.author(), &self.context.committee);
321            }
322            // Check if the target block is now certified after including the reject votes.
323            // NOTE: votes can already exist for the target block and its transactions.
324            if let Some(certified_block) = vote_info.take_certified_output(&self.context) {
325                let authority_name = self
326                    .context
327                    .committee
328                    .authority(certified_block.block.author())
329                    .hostname
330                    .clone();
331                self.context
332                    .metrics
333                    .node_metrics
334                    .certifier_block_latency
335                    .with_label_values(&[&authority_name])
336                    .observe(
337                        Duration::from_millis(
338                            now.saturating_sub(certified_block.block.timestamp_ms()),
339                        )
340                        .as_secs_f64(),
341                    );
342                certified_blocks.push(certified_block);
343            }
344        }
345
346        certified_blocks
347    }
348
349    fn add_proposed_block(&mut self, proposed_block: VerifiedBlock) -> Vec<CertifiedBlock> {
350        if proposed_block.round() <= self.gc_round + 2 {
351            // Skip if transactions that can be certified have already been GC'ed.
352            // Skip also when the proposed block has been GC'ed from the certifier state.
353            // This is possible because this function (add_proposed_block()) is async from
354            // commit finalization, which advances the GC round of the certifier.
355            return vec![];
356        }
357        debug!(
358            "Adding proposed block {}; gc round: {}",
359            proposed_block.reference(),
360            self.gc_round
361        );
362
363        if !self.votes.contains_key(&proposed_block.reference()) {
364            self.context
365                .metrics
366                .node_metrics
367                .certifier_missing_ancestor_during_certification
368                .with_label_values(&["proposed_block_not_found"])
369                .inc();
370            debug!(
371                "Proposed block {} not found in certifier state. GC round: {}",
372                proposed_block.reference(),
373                self.gc_round,
374            );
375            return vec![];
376        }
377
378        let now = self.context.clock.timestamp_utc_ms();
379
380        // Certify transactions based on the accept votes from the proposed block's parents.
381        // Some ancestor blocks may not be found, because either they have been GC'ed due to timing
382        // issues or they were not recovered. It is ok to skip certifying blocks, which are best effort.
383        let mut certified_blocks = vec![];
384        for voting_ancestor in proposed_block.ancestors() {
385            // Votes are limited to 1 round before the proposed block.
386            if voting_ancestor.round + 1 != proposed_block.round() {
387                continue;
388            }
389            let Some(voting_info) = self.votes.get(voting_ancestor) else {
390                self.context
391                    .metrics
392                    .node_metrics
393                    .certifier_missing_ancestor_during_certification
394                    .with_label_values(&["voting_info_not_found"])
395                    .inc();
396                debug!(
397                    "Proposed block {}: voting info not found for ancestor {}",
398                    proposed_block.reference(),
399                    voting_ancestor
400                );
401                continue;
402            };
403            let Some(voting_block) = voting_info.block.clone() else {
404                self.context
405                    .metrics
406                    .node_metrics
407                    .certifier_missing_ancestor_during_certification
408                    .with_label_values(&["voting_block_not_found"])
409                    .inc();
410                debug!(
411                    "Proposed block {}: voting block not found for ancestor {}",
412                    proposed_block.reference(),
413                    voting_ancestor
414                );
415                continue;
416            };
417            for target_ancestor in voting_block.ancestors() {
418                // Target blocks are 1 round before the voting block.
419                if target_ancestor.round + 1 != voting_block.round() {
420                    continue;
421                }
422                let Some(target_vote_info) = self.votes.get_mut(target_ancestor) else {
423                    self.context
424                        .metrics
425                        .node_metrics
426                        .certifier_missing_ancestor_during_certification
427                        .with_label_values(&["target_vote_info_not_found"])
428                        .inc();
429                    debug!(
430                        "Proposed block {}: target voting info not found for ancestor {}",
431                        proposed_block.reference(),
432                        target_ancestor
433                    );
434                    continue;
435                };
436                target_vote_info
437                    .accept_block_votes
438                    .add_unique(voting_block.author(), &self.context.committee);
439                // Check if the target block is now certified after including the accept votes.
440                if let Some(certified_block) = target_vote_info.take_certified_output(&self.context)
441                {
442                    let authority_name = self
443                        .context
444                        .committee
445                        .authority(certified_block.block.author())
446                        .hostname
447                        .clone();
448                    self.context
449                        .metrics
450                        .node_metrics
451                        .certifier_block_latency
452                        .with_label_values(&[&authority_name])
453                        .observe(
454                            Duration::from_millis(
455                                now.saturating_sub(certified_block.block.timestamp_ms()),
456                            )
457                            .as_secs_f64(),
458                        );
459                    certified_blocks.push(certified_block);
460                }
461            }
462        }
463
464        if !certified_blocks.is_empty() {
465            self.context
466                .metrics
467                .node_metrics
468                .certifier_output_blocks
469                .with_label_values(&["proposed"])
470                .inc_by(certified_blocks.len() as u64);
471        }
472
473        certified_blocks
474    }
475
476    /// Updates the GC round and cleans up obsolete internal state.
477    fn update_gc_round(&mut self, gc_round: Round) {
478        self.gc_round = gc_round;
479        while let Some((block_ref, _)) = self.votes.first_key_value() {
480            if block_ref.round <= self.gc_round {
481                self.votes.pop_first();
482            } else {
483                break;
484            }
485        }
486
487        self.context
488            .metrics
489            .node_metrics
490            .certifier_gc_round
491            .set(self.gc_round as i64);
492    }
493}
494
495/// VoteInfo keeps track of votes received for each transaction of this block,
496/// possibly even before the block is received by this authority.
497struct VoteInfo {
498    // Content of the block.
499    // None if the blocks has not been received.
500    block: Option<VerifiedBlock>,
501    // Rejection votes by this authority on this block.
502    // This field is written when the block is first received and its transactions are voted on.
503    // It is read from core after the block is accepted.
504    own_reject_txn_votes: Vec<TransactionIndex>,
505    // Accumulates implicit accept votes for the block and all transactions.
506    accept_block_votes: StakeAggregator<QuorumThreshold>,
507    // Accumulates reject votes per transaction in this block.
508    reject_txn_votes: BTreeMap<TransactionIndex, StakeAggregator<QuorumThreshold>>,
509    // Whether this block has been certified already.
510    is_certified: bool,
511}
512
513impl VoteInfo {
514    // If this block can now be certified, returns the output.
515    // Otherwise, returns None.
516    fn take_certified_output(&mut self, context: &Context) -> Option<CertifiedBlock> {
517        let committee = &context.committee;
518        if self.is_certified {
519            // Skip if already certified.
520            return None;
521        }
522        let Some(block) = self.block.as_ref() else {
523            // Skip if the content of the block has not been received.
524            return None;
525        };
526
527        let peer_hostname = &committee.authority(block.author()).hostname;
528
529        if !self.accept_block_votes.reached_threshold(committee) {
530            // Skip if the block is not certified.
531            return None;
532        }
533        let mut rejected = vec![];
534        for (idx, reject_txn_votes) in &self.reject_txn_votes {
535            // The transaction is voted to be rejected.
536            if reject_txn_votes.reached_threshold(committee) {
537                context
538                    .metrics
539                    .node_metrics
540                    .certifier_rejected_transactions
541                    .with_label_values(&[peer_hostname])
542                    .inc();
543                rejected.push(*idx);
544                continue;
545            }
546            // If a transaction does not have a quorum of accept votes minus the reject votes,
547            // it is neither rejected nor certified. In this case the whole block cannot
548            // be considered as certified.
549
550            // accept_block_votes can be < reject_txn_votes on the transaction when reject_txn_votes
551            // come from blocks more than 1 round higher, which do not add to the
552            // accept votes of the block.
553            //
554            // Also, the total accept votes of a transactions is undercounted here.
555            // If a block has accept votes from a quorum of authorities A, B and C, but one transaction
556            // has a reject vote from D, the transaction and block are technically certified
557            // and can be sent to fastpath. However, the computation here will not certify the transaction
558            // or the block. This is still fine because the fastpath certification is optional.
559            // The definite status of the transaction will be decided during post commit finalization.
560            if self
561                .accept_block_votes
562                .stake()
563                .saturating_sub(reject_txn_votes.stake())
564                < committee.quorum_threshold()
565            {
566                return None;
567            }
568        }
569        // The block is certified.
570        let accepted_txn_count = block.transactions().len().saturating_sub(rejected.len());
571        tracing::trace!(
572            "Certified block {} accepted tx count: {accepted_txn_count} & rejected txn count: {}",
573            block.reference(),
574            rejected.len()
575        );
576        context
577            .metrics
578            .node_metrics
579            .certifier_accepted_transactions
580            .with_label_values(&[peer_hostname])
581            .inc_by(accepted_txn_count as u64);
582        self.is_certified = true;
583        Some(CertifiedBlock {
584            block: block.clone(),
585            rejected,
586        })
587    }
588}
589
590impl Default for VoteInfo {
591    fn default() -> Self {
592        Self {
593            block: None,
594            own_reject_txn_votes: vec![],
595            accept_block_votes: StakeAggregator::new(),
596            reject_txn_votes: BTreeMap::new(),
597            is_certified: false,
598        }
599    }
600}
601
602#[cfg(test)]
603mod test {
604    use consensus_config::AuthorityIndex;
605    use itertools::Itertools;
606    use rand::seq::SliceRandom as _;
607
608    use crate::{
609        TestBlock, Transaction, block::BlockTransactionVotes, context::Context,
610        test_dag_builder::DagBuilder,
611    };
612
613    use super::*;
614
615    #[tokio::test]
616    async fn test_vote_info_basic() {
617        telemetry_subscribers::init_for_testing();
618        let (context, _) = Context::new_for_test(7);
619        let committee = &context.committee;
620
621        // No accept votes.
622        {
623            let mut vote_info = VoteInfo::default();
624            let block = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
625            vote_info.block = Some(block.clone());
626
627            assert!(vote_info.take_certified_output(&context).is_none());
628        }
629
630        // Accept votes but not enough.
631        {
632            let mut vote_info = VoteInfo::default();
633            let block = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
634            vote_info.block = Some(block.clone());
635            for i in 0..4 {
636                vote_info
637                    .accept_block_votes
638                    .add_unique(AuthorityIndex::new_for_test(i), committee);
639            }
640
641            assert!(vote_info.take_certified_output(&context).is_none());
642        }
643
644        // Enough accept votes but no block.
645        {
646            let mut vote_info = VoteInfo::default();
647            for i in 0..5 {
648                vote_info
649                    .accept_block_votes
650                    .add_unique(AuthorityIndex::new_for_test(i), committee);
651            }
652
653            assert!(vote_info.take_certified_output(&context).is_none());
654        }
655
656        // A quorum of accept votes and block exists.
657        {
658            let mut vote_info = VoteInfo::default();
659            let block = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
660            vote_info.block = Some(block.clone());
661            for i in 0..4 {
662                vote_info
663                    .accept_block_votes
664                    .add_unique(AuthorityIndex::new_for_test(i), committee);
665            }
666
667            // The block is not certified.
668            assert!(vote_info.take_certified_output(&context).is_none());
669
670            // Add 1 more accept vote from a different authority.
671            vote_info
672                .accept_block_votes
673                .add_unique(AuthorityIndex::new_for_test(4), committee);
674
675            // The block is now certified.
676            let certified_block = vote_info.take_certified_output(&context).unwrap();
677            assert_eq!(certified_block.block.reference(), block.reference());
678
679            // Certified block cannot be taken again.
680            assert!(vote_info.take_certified_output(&context).is_none());
681        }
682
683        // A quorum of accept and reject votes.
684        {
685            let mut vote_info = VoteInfo::default();
686            // Create a block with 7 transactions.
687            let block = VerifiedBlock::new_for_test(
688                TestBlock::new(1, 1)
689                    .set_transactions(vec![Transaction::new(vec![4; 8]); 7])
690                    .build(),
691            );
692            vote_info.block = Some(block.clone());
693            // Add 5 accept votes which form a quorum.
694            for i in 0..5 {
695                vote_info
696                    .accept_block_votes
697                    .add_unique(AuthorityIndex::new_for_test(i), committee);
698            }
699            // For transactions 3 - 7 ..
700            for reject_tx_idx in 3..8 {
701                vote_info
702                    .reject_txn_votes
703                    .insert(reject_tx_idx, StakeAggregator::new());
704                // .. add 5 reject votes which form a quorum.
705                for authority_idx in 0..5 {
706                    vote_info
707                        .reject_txn_votes
708                        .get_mut(&reject_tx_idx)
709                        .unwrap()
710                        .add_unique(AuthorityIndex::new_for_test(authority_idx), committee);
711                }
712            }
713
714            // The block is certified.
715            let certified_block = vote_info.take_certified_output(&context).unwrap();
716            assert_eq!(certified_block.block.reference(), block.reference());
717
718            // Certified block cannot be taken again.
719            assert!(vote_info.take_certified_output(&context).is_none());
720        }
721
722        // A transaction in the block is neither rejected nor certified.
723        {
724            let mut vote_info = VoteInfo::default();
725            // Create a block with 6 transactions.
726            let block = VerifiedBlock::new_for_test(
727                TestBlock::new(1, 1)
728                    .set_transactions(vec![Transaction::new(vec![4; 8]); 6])
729                    .build(),
730            );
731            vote_info.block = Some(block.clone());
732            // Add 5 accept votes which form a quorum.
733            for i in 0..5 {
734                vote_info
735                    .accept_block_votes
736                    .add_unique(AuthorityIndex::new_for_test(i), committee);
737            }
738            // For transactions 3 - 5 ..
739            for reject_tx_idx in 3..6 {
740                vote_info
741                    .reject_txn_votes
742                    .insert(reject_tx_idx, StakeAggregator::new());
743                // .. add 5 reject votes which form a quorum.
744                for authority_idx in 0..5 {
745                    vote_info
746                        .reject_txn_votes
747                        .get_mut(&reject_tx_idx)
748                        .unwrap()
749                        .add_unique(AuthorityIndex::new_for_test(authority_idx), committee);
750                }
751            }
752            // For transaction 6, add 4 reject votes which do not form a quorum.
753            vote_info.reject_txn_votes.insert(5, StakeAggregator::new());
754            for authority_idx in 0..4 {
755                vote_info
756                    .reject_txn_votes
757                    .get_mut(&5)
758                    .unwrap()
759                    .add_unique(AuthorityIndex::new_for_test(authority_idx), committee);
760            }
761
762            // The block is not certified.
763            assert!(vote_info.take_certified_output(&context).is_none());
764
765            // Add 1 more accept vote from a different authority for transaction 6.
766            vote_info
767                .reject_txn_votes
768                .get_mut(&5)
769                .unwrap()
770                .add_unique(AuthorityIndex::new_for_test(4), committee);
771
772            // The block is now certified.
773            let certified_block = vote_info.take_certified_output(&context).unwrap();
774            assert_eq!(certified_block.block.reference(), block.reference());
775
776            // Certified block cannot be taken again.
777            assert!(vote_info.take_certified_output(&context).is_none());
778        }
779    }
780
781    #[tokio::test]
782    async fn test_certify_basic() {
783        telemetry_subscribers::init_for_testing();
784        let (context, _) = Context::new_for_test(4);
785        let context = Arc::new(context);
786
787        // GIVEN: Round 1: blocks from all authorities are fully connected to the genesis blocks.
788        let mut dag_builder = DagBuilder::new(context.clone());
789        dag_builder.layer(1).num_transactions(4).build();
790        let round_1_blocks = dag_builder.all_blocks();
791        let mut all_blocks = round_1_blocks.clone();
792
793        // THEN: Round 1: no block can be certified yet.
794        let mut certifier = CertifierState::new(context.clone());
795        let certified_blocks = certifier
796            .add_voted_blocks(round_1_blocks.iter().map(|b| (b.clone(), vec![])).collect());
797        assert!(certified_blocks.is_empty());
798
799        // GIVEN: Round 2: A, B & C blocks at round 2 are connected to only A, B & C blocks at round 1.
800        // AND: A & B blocks reject transaction 1 from the round 1 B block.
801        // AND: A, B & C blocks reject transaction 2 from the round 1 C block.
802        let transactions = (0..4)
803            .map(|_| Transaction::new(vec![0_u8; 16]))
804            .collect::<Vec<_>>();
805        let ancestors = round_1_blocks
806            .iter()
807            .filter_map(|b| {
808                if b.author().value() < 3 {
809                    Some(b.reference())
810                } else {
811                    None
812                }
813            })
814            .collect::<Vec<_>>();
815        for author in 0..3 {
816            let mut block = TestBlock::new(2, author)
817                .set_ancestors_raw(ancestors.clone())
818                .set_transactions(transactions.clone());
819            let mut votes = vec![];
820            for i in 0..(3 - author) {
821                let j = author + i;
822                if j == 0 {
823                    // Do not reject transaction 0 from the round 1 A block.
824                    continue;
825                }
826                votes.push(BlockTransactionVotes {
827                    block_ref: round_1_blocks[j as usize].reference(),
828                    rejects: vec![j as u16],
829                });
830            }
831            block = block.set_transaction_votes(votes);
832            all_blocks.push(VerifiedBlock::new_for_test(block.build()));
833        }
834
835        // THEN: Round 2: no block can be certified yet.
836        let mut certifier = CertifierState::new(context.clone());
837        let certified_blocks =
838            certifier.add_voted_blocks(all_blocks.iter().map(|b| (b.clone(), vec![])).collect());
839        assert!(certified_blocks.is_empty());
840
841        // GIVEN: Round 3: all blocks connected to round 2 blocks and round 1 D block,
842        let ancestors = all_blocks
843            .iter()
844            .filter_map(|b| {
845                if b.round() == 1 && b.author().value() == 3 {
846                    Some(b.reference())
847                } else if b.round() == 2 {
848                    assert_ne!(b.author().value(), 3);
849                    Some(b.reference())
850                } else {
851                    None
852                }
853            })
854            .collect::<Vec<_>>();
855        assert_eq!(ancestors.len(), 4, "Ancestors {:?}", ancestors);
856        let mut round_3_blocks = vec![];
857        for author in 0..4 {
858            let block = TestBlock::new(3, author)
859                .set_ancestors_raw(ancestors.clone())
860                .set_transactions(transactions.clone());
861            round_3_blocks.push(VerifiedBlock::new_for_test(block.build()));
862        }
863
864        // THEN: Round 3: with 1 round 3 block, A & C round 1 blocks are certified.
865        let mut certifier = CertifierState::new(context.clone());
866        certifier.add_voted_blocks(all_blocks.iter().map(|b| (b.clone(), vec![])).collect());
867        let proposed_block = round_3_blocks.pop().unwrap();
868        let mut certified_blocks =
869            certifier.add_voted_blocks(vec![(proposed_block.clone(), vec![])]);
870        certified_blocks.extend(certifier.add_proposed_block(proposed_block));
871        assert_eq!(
872            certified_blocks.len(),
873            2,
874            "Certified blocks {:?}",
875            certified_blocks
876                .iter()
877                .map(|b| b.block.reference().to_string())
878                .join(",")
879        );
880        assert_eq!(
881            certified_blocks[0].block.reference(),
882            round_1_blocks[0].reference()
883        );
884        assert!(certified_blocks[0].rejected.is_empty());
885        assert_eq!(
886            certified_blocks[1].block.reference(),
887            round_1_blocks[2].reference()
888        );
889        assert_eq!(certified_blocks[1].rejected, vec![2]);
890    }
891
892    // TODO: add reject votes.
893    #[tokio::test]
894    async fn test_certify_randomized() {
895        telemetry_subscribers::init_for_testing();
896        let num_authorities: u32 = 7;
897        let (context, _) = Context::new_for_test(num_authorities as usize);
898        let context = Arc::new(context);
899
900        // Create minimal connected blocks up to num_rounds.
901        let num_rounds = 50;
902        let mut dag_builder = DagBuilder::new(context.clone());
903        dag_builder
904            .layers(1..=num_rounds)
905            .min_ancestor_links(false, None)
906            .build();
907        let all_blocks = dag_builder.all_blocks();
908
909        // Get the certified blocks, which depends on the structure of the minimum connected DAG.
910        let mut certifier = CertifierState::new(context.clone());
911        let mut expected_certified_blocks =
912            certifier.add_voted_blocks(all_blocks.iter().map(|b| (b.clone(), vec![])).collect());
913        expected_certified_blocks.sort_by_key(|b| b.block.reference());
914
915        // Adding all blocks to certifier in random order should still produce the same set of certified blocks.
916        for _ in 0..100 {
917            // Add the blocks to certifier in random order.
918            let mut all_blocks = all_blocks.clone();
919            all_blocks.shuffle(&mut rand::thread_rng());
920            let mut certifier = CertifierState::new(context.clone());
921
922            // Take the certified blocks.
923            let mut actual_certified_blocks = certifier
924                .add_voted_blocks(all_blocks.iter().map(|b| (b.clone(), vec![])).collect());
925            actual_certified_blocks.sort_by_key(|b| b.block.reference());
926
927            // Ensure the certified blocks are the expected ones.
928            assert_eq!(
929                actual_certified_blocks.len(),
930                expected_certified_blocks.len()
931            );
932            for (actual, expected) in actual_certified_blocks
933                .iter()
934                .zip_debug_eq(expected_certified_blocks.iter())
935            {
936                assert_eq!(actual.block.reference(), expected.block.reference());
937                assert_eq!(actual.rejected, expected.rejected);
938            }
939        }
940    }
941}