consensus_core/
transaction_certifier.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::BTreeMap, sync::Arc, time::Duration};
5
6use consensus_config::Stake;
7use consensus_types::block::{BlockRef, Round, TransactionIndex};
8use mysten_metrics::monitored_mpsc::UnboundedSender;
9use parking_lot::RwLock;
10use tracing::{debug, info};
11
12use crate::{
13    BlockAPI as _, CertifiedBlock, CertifiedBlocksOutput, VerifiedBlock,
14    block::{BlockTransactionVotes, GENESIS_ROUND},
15    block_verifier::BlockVerifier,
16    context::Context,
17    dag_state::DagState,
18    stake_aggregator::{QuorumThreshold, StakeAggregator},
19};
20
21/// TransactionCertifier has the following purposes:
22/// 1. Certifies transactions and sends them to execute on the fastpath.
23/// 2. Keeps track of own votes on transactions, and allows the votes to be retrieved
24///    later in core after acceptance of the blocks containing the transactions.
25/// 3. Aggregates reject votes on transactions, and allows the aggregated votes
26///    to be retrieved during post-commit finalization.
27///
28/// A transaction is certified if a quorum of authorities in the causal history of a proposed block
29/// vote to accept the transaction. Accept votes are implicit in blocks: if a transaction is in
30/// the causal history of a block and the block does not vote to reject it, the block
31/// is considered to vote to accept the transaction. Transaction finalization are eventually resolved
32/// post commit, by checking if there is a certification of the transaction in the causal history
33/// of the leader. So only accept votes are only considered if they are in the causal history of own
34/// proposed blocks.
35///
36/// A transaction is rejected if a quorum of authorities vote to reject it. When this happens, it is
37/// guaranteed that no validator can observe a certification of the transaction, with <= f malicious
38/// stake.
39///
40/// A block is certified if every transaction in the block is either certified or rejected.
41/// TransactionCertifier outputs certified blocks.
42///
43/// The invariant between TransactionCertifier and post-commit finalization is that if a quorum of
44/// authorities certified a transaction for fastpath and executed it, then the transaction
45/// must also be finalized post consensus commit. The reverse is not true though, because
46/// fastpath execution is only a latency optimization, and not required for correctness.
47#[derive(Clone)]
48pub struct TransactionCertifier {
49    // The state of blocks being voted on and certified.
50    certifier_state: Arc<RwLock<CertifierState>>,
51    // Verify transactions during recovery.
52    block_verifier: Arc<dyn BlockVerifier>,
53    // The state of the DAG.
54    dag_state: Arc<RwLock<DagState>>,
55    // An unbounded channel to output certified blocks to Sui consensus block handler.
56    certified_blocks_sender: UnboundedSender<CertifiedBlocksOutput>,
57}
58
59impl TransactionCertifier {
60    pub fn new(
61        context: Arc<Context>,
62        block_verifier: Arc<dyn BlockVerifier>,
63        dag_state: Arc<RwLock<DagState>>,
64        certified_blocks_sender: UnboundedSender<CertifiedBlocksOutput>,
65    ) -> Self {
66        Self {
67            certifier_state: Arc::new(RwLock::new(CertifierState::new(context))),
68            block_verifier,
69            dag_state,
70            certified_blocks_sender,
71        }
72    }
73
74    /// Recovers all blocks from DB after the given round.
75    ///
76    /// This is useful for initializing the certifier state
77    /// for future commits and block proposals.
78    pub(crate) fn recover_blocks_after_round(&self, after_round: Round) {
79        let context = self.certifier_state.read().context.clone();
80        if !context.protocol_config.mysticeti_fastpath() {
81            info!("Skipping certifier recovery in non-mysticeti fast path mode");
82            return;
83        }
84
85        let store = self.dag_state.read().store().clone();
86
87        let recovery_start_round = after_round + 1;
88        info!(
89            "Recovering certifier state from round {}",
90            recovery_start_round,
91        );
92
93        let authorities = context
94            .committee
95            .authorities()
96            .map(|(index, _)| index)
97            .collect::<Vec<_>>();
98        for authority_index in authorities {
99            let blocks = store
100                .scan_blocks_by_author(authority_index, recovery_start_round)
101                .unwrap();
102            info!(
103                "Recovering and voting on {} blocks for authority {} {}",
104                blocks.len(),
105                authority_index,
106                context.committee.authority(authority_index).hostname
107            );
108            self.recover_and_vote_on_blocks(blocks);
109        }
110    }
111
112    /// Recovers and votes on the given blocks.
113    ///
114    /// Reject votes contained in the input blocks are processed.
115    ///
116    /// Because own votes on blocks are not stored, input blocks are voted on if they can be
117    /// included in a future proposed block.
118    pub(crate) fn recover_and_vote_on_blocks(&self, blocks: Vec<VerifiedBlock>) {
119        let dag_state = self.dag_state.read();
120        let voted_blocks = blocks
121            .into_iter()
122            .map(|b| {
123                if b.round() <= dag_state.gc_round() || dag_state.is_hard_linked(&b.reference()) {
124                    // Voting is unnecessary for blocks already included in own proposed blocks,
125                    // or outside of local DAG GC bound.
126                    (b, vec![])
127                } else {
128                    // Voting is needed for blocks not yet included in own proposed blocks.
129                    // When a block proposal includes the input block, votes on the input block will be added to the proposed block.
130                    let reject_transaction_votes =
131                        self.block_verifier.vote(&b).unwrap_or_else(|e| {
132                            panic!("Failed to vote on block during recovery: {}", e)
133                        });
134                    (b, reject_transaction_votes)
135                }
136            })
137            .collect::<Vec<_>>();
138        self.certifier_state.write().add_voted_blocks(voted_blocks);
139        // Do not send certified blocks to the fastpath output channel during recovery,
140        // because these transactions could have been executed and fastpath latency optimization is
141        // unnecessary for recovered transactions.
142    }
143
144    /// Stores own reject votes on input blocks, and aggregates reject votes from the input blocks.
145    /// Newly certified blocks are sent to the fastpath output channel.
146    pub fn add_voted_blocks(&self, voted_blocks: Vec<(VerifiedBlock, Vec<TransactionIndex>)>) {
147        let certified_blocks = self.certifier_state.write().add_voted_blocks(voted_blocks);
148        self.send_certified_blocks(certified_blocks);
149    }
150
151    /// Aggregates accept votes from the own proposed block.
152    /// Newly certified blocks are sent to the fastpath output channel.
153    pub(crate) fn add_proposed_block(&self, proposed_block: VerifiedBlock) {
154        let certified_blocks = self
155            .certifier_state
156            .write()
157            .add_proposed_block(proposed_block);
158        self.send_certified_blocks(certified_blocks);
159    }
160
161    // Sends certified blocks to the fastpath output channel.
162    fn send_certified_blocks(&self, certified_blocks: Vec<CertifiedBlock>) {
163        if certified_blocks.is_empty() {
164            return;
165        }
166        if let Err(e) = self.certified_blocks_sender.send(CertifiedBlocksOutput {
167            blocks: certified_blocks,
168        }) {
169            tracing::warn!("Failed to send certified blocks: {:?}", e);
170        }
171    }
172
173    /// Retrieves own votes on peer block transactions.
174    pub(crate) fn get_own_votes(&self, block_refs: Vec<BlockRef>) -> Vec<BlockTransactionVotes> {
175        let mut votes = vec![];
176        let certifier_state = self.certifier_state.read();
177        for block_ref in block_refs {
178            if block_ref.round <= certifier_state.gc_round {
179                continue;
180            }
181            let vote_info = certifier_state.votes.get(&block_ref).unwrap_or_else(|| {
182                panic!("Ancestor block {} not found in certifier state", block_ref)
183            });
184            if !vote_info.own_reject_txn_votes.is_empty() {
185                votes.push(BlockTransactionVotes {
186                    block_ref,
187                    rejects: vote_info.own_reject_txn_votes.clone(),
188                });
189            }
190        }
191        votes
192    }
193
194    /// Retrieves transactions in the block that have received reject votes, and the total stake of the votes.
195    /// TransactionIndex not included in the output has no reject votes.
196    /// Returns None if no information is found for the block.
197    pub(crate) fn get_reject_votes(
198        &self,
199        block_ref: &BlockRef,
200    ) -> Option<Vec<(TransactionIndex, Stake)>> {
201        let accumulated_reject_votes = self
202            .certifier_state
203            .read()
204            .votes
205            .get(block_ref)?
206            .reject_txn_votes
207            .iter()
208            .map(|(idx, stake_agg)| (*idx, stake_agg.stake()))
209            .collect::<Vec<_>>();
210        Some(accumulated_reject_votes)
211    }
212
213    /// Runs garbage collection on the internal state by removing data for blocks <= gc_round,
214    /// and updates the GC round for the certifier.
215    ///
216    /// IMPORTANT: the gc_round used here can trail the latest gc_round from DagState.
217    /// This is because the gc round here is determined by CommitFinalizer, which needs to process
218    /// commits before the latest commit in DagState. Reject votes received by transactions below
219    /// local DAG gc_round may still need to be accessed from CommitFinalizer.
220    pub(crate) fn run_gc(&self, gc_round: Round) {
221        let dag_state_gc_round = self.dag_state.read().gc_round();
222        assert!(
223            gc_round <= dag_state_gc_round,
224            "TransactionCertifier cannot GC higher than DagState GC round ({} > {})",
225            gc_round,
226            dag_state_gc_round
227        );
228        self.certifier_state.write().update_gc_round(gc_round);
229    }
230}
231
232/// CertifierState keeps track of votes received by each transaction and block,
233/// and helps determine if votes reach a quorum. Reject votes can start accumulating
234/// even before the target block is received by this authority.
235struct CertifierState {
236    context: Arc<Context>,
237
238    // Maps received blocks' refs to votes on those blocks from other blocks.
239    // Even if a block has no reject votes on its transactions, it still has an entry here.
240    votes: BTreeMap<BlockRef, VoteInfo>,
241
242    // Highest round where blocks are GC'ed.
243    gc_round: Round,
244}
245
246impl CertifierState {
247    fn new(context: Arc<Context>) -> Self {
248        Self {
249            context,
250            votes: BTreeMap::new(),
251            gc_round: GENESIS_ROUND,
252        }
253    }
254
255    fn add_voted_blocks(
256        &mut self,
257        voted_blocks: Vec<(VerifiedBlock, Vec<TransactionIndex>)>,
258    ) -> Vec<CertifiedBlock> {
259        let mut certified_blocks = vec![];
260        for (voted_block, reject_txn_votes) in voted_blocks {
261            let blocks = self.add_voted_block(voted_block, reject_txn_votes);
262            certified_blocks.extend(blocks);
263        }
264
265        if !certified_blocks.is_empty() {
266            self.context
267                .metrics
268                .node_metrics
269                .certifier_output_blocks
270                .with_label_values(&["voted"])
271                .inc_by(certified_blocks.len() as u64);
272        }
273
274        certified_blocks
275    }
276
277    fn add_voted_block(
278        &mut self,
279        voted_block: VerifiedBlock,
280        reject_txn_votes: Vec<TransactionIndex>,
281    ) -> Vec<CertifiedBlock> {
282        if voted_block.round() <= self.gc_round {
283            // Ignore the block and own votes, since they are outside of certifier GC bound.
284            return vec![];
285        }
286
287        // Count own reject votes against each peer authority.
288        let peer_hostname = &self
289            .context
290            .committee
291            .authority(voted_block.author())
292            .hostname;
293        self.context
294            .metrics
295            .node_metrics
296            .certifier_own_reject_votes
297            .with_label_values(&[peer_hostname])
298            .inc_by(reject_txn_votes.len() as u64);
299
300        // Initialize the entry for the voted block.
301        let vote_info = self.votes.entry(voted_block.reference()).or_default();
302        if vote_info.block.is_some() {
303            // Input block has already been processed and added to the state.
304            return vec![];
305        }
306        vote_info.block = Some(voted_block.clone());
307        vote_info.own_reject_txn_votes = reject_txn_votes;
308
309        let mut certified_blocks = vec![];
310
311        let now = self.context.clock.timestamp_utc_ms();
312
313        // Update reject votes from the input block.
314        for block_votes in voted_block.transaction_votes() {
315            if block_votes.block_ref.round <= self.gc_round {
316                // Block is outside of GC bound.
317                continue;
318            }
319            let vote_info = self.votes.entry(block_votes.block_ref).or_default();
320            for reject in &block_votes.rejects {
321                vote_info
322                    .reject_txn_votes
323                    .entry(*reject)
324                    .or_default()
325                    .add_unique(voted_block.author(), &self.context.committee);
326            }
327            // Check if the target block is now certified after including the reject votes.
328            // NOTE: votes can already exist for the target block and its transactions.
329            if let Some(certified_block) = vote_info.take_certified_output(&self.context) {
330                let authority_name = self
331                    .context
332                    .committee
333                    .authority(certified_block.block.author())
334                    .hostname
335                    .clone();
336                self.context
337                    .metrics
338                    .node_metrics
339                    .certifier_block_latency
340                    .with_label_values(&[&authority_name])
341                    .observe(
342                        Duration::from_millis(
343                            now.saturating_sub(certified_block.block.timestamp_ms()),
344                        )
345                        .as_secs_f64(),
346                    );
347                certified_blocks.push(certified_block);
348            }
349        }
350
351        certified_blocks
352    }
353
354    fn add_proposed_block(&mut self, proposed_block: VerifiedBlock) -> Vec<CertifiedBlock> {
355        if proposed_block.round() <= self.gc_round + 2 {
356            // Skip if transactions that can be certified have already been GC'ed.
357            // Skip also when the proposed block has been GC'ed from the certifier state.
358            // This is possible because this function (add_proposed_block()) is async from
359            // commit finalization, which advances the GC round of the certifier.
360            return vec![];
361        }
362        debug!(
363            "Adding proposed block {}; gc round: {}",
364            proposed_block.reference(),
365            self.gc_round
366        );
367
368        if !self.votes.contains_key(&proposed_block.reference()) {
369            self.context
370                .metrics
371                .node_metrics
372                .certifier_missing_ancestor_during_certification
373                .with_label_values(&["proposed_block_not_found"])
374                .inc();
375            debug!(
376                "Proposed block {} not found in certifier state. GC round: {}",
377                proposed_block.reference(),
378                self.gc_round,
379            );
380            return vec![];
381        }
382
383        let now = self.context.clock.timestamp_utc_ms();
384
385        // Certify transactions based on the accept votes from the proposed block's parents.
386        // Some ancestor blocks may not be found, because either they have been GC'ed due to timing
387        // issues or they were not recovered. It is ok to skip certifying blocks, which are best effort.
388        let mut certified_blocks = vec![];
389        for voting_ancestor in proposed_block.ancestors() {
390            // Votes are limited to 1 round before the proposed block.
391            if voting_ancestor.round + 1 != proposed_block.round() {
392                continue;
393            }
394            let Some(voting_info) = self.votes.get(voting_ancestor) else {
395                self.context
396                    .metrics
397                    .node_metrics
398                    .certifier_missing_ancestor_during_certification
399                    .with_label_values(&["voting_info_not_found"])
400                    .inc();
401                debug!(
402                    "Proposed block {}: voting info not found for ancestor {}",
403                    proposed_block.reference(),
404                    voting_ancestor
405                );
406                continue;
407            };
408            let Some(voting_block) = voting_info.block.clone() else {
409                self.context
410                    .metrics
411                    .node_metrics
412                    .certifier_missing_ancestor_during_certification
413                    .with_label_values(&["voting_block_not_found"])
414                    .inc();
415                debug!(
416                    "Proposed block {}: voting block not found for ancestor {}",
417                    proposed_block.reference(),
418                    voting_ancestor
419                );
420                continue;
421            };
422            for target_ancestor in voting_block.ancestors() {
423                // Target blocks are 1 round before the voting block.
424                if target_ancestor.round + 1 != voting_block.round() {
425                    continue;
426                }
427                let Some(target_vote_info) = self.votes.get_mut(target_ancestor) else {
428                    self.context
429                        .metrics
430                        .node_metrics
431                        .certifier_missing_ancestor_during_certification
432                        .with_label_values(&["target_vote_info_not_found"])
433                        .inc();
434                    debug!(
435                        "Proposed block {}: target voting info not found for ancestor {}",
436                        proposed_block.reference(),
437                        target_ancestor
438                    );
439                    continue;
440                };
441                target_vote_info
442                    .accept_block_votes
443                    .add_unique(voting_block.author(), &self.context.committee);
444                // Check if the target block is now certified after including the accept votes.
445                if let Some(certified_block) = target_vote_info.take_certified_output(&self.context)
446                {
447                    let authority_name = self
448                        .context
449                        .committee
450                        .authority(certified_block.block.author())
451                        .hostname
452                        .clone();
453                    self.context
454                        .metrics
455                        .node_metrics
456                        .certifier_block_latency
457                        .with_label_values(&[&authority_name])
458                        .observe(
459                            Duration::from_millis(
460                                now.saturating_sub(certified_block.block.timestamp_ms()),
461                            )
462                            .as_secs_f64(),
463                        );
464                    certified_blocks.push(certified_block);
465                }
466            }
467        }
468
469        if !certified_blocks.is_empty() {
470            self.context
471                .metrics
472                .node_metrics
473                .certifier_output_blocks
474                .with_label_values(&["proposed"])
475                .inc_by(certified_blocks.len() as u64);
476        }
477
478        certified_blocks
479    }
480
481    /// Updates the GC round and cleans up obsolete internal state.
482    fn update_gc_round(&mut self, gc_round: Round) {
483        self.gc_round = gc_round;
484        while let Some((block_ref, _)) = self.votes.first_key_value() {
485            if block_ref.round <= self.gc_round {
486                self.votes.pop_first();
487            } else {
488                break;
489            }
490        }
491
492        self.context
493            .metrics
494            .node_metrics
495            .certifier_gc_round
496            .set(self.gc_round as i64);
497    }
498}
499
500/// VoteInfo keeps track of votes received for each transaction of this block,
501/// possibly even before the block is received by this authority.
502struct VoteInfo {
503    // Content of the block.
504    // None if the blocks has not been received.
505    block: Option<VerifiedBlock>,
506    // Rejection votes by this authority on this block.
507    // This field is written when the block is first received and its transactions are voted on.
508    // It is read from core after the block is accepted.
509    own_reject_txn_votes: Vec<TransactionIndex>,
510    // Accumulates implicit accept votes for the block and all transactions.
511    accept_block_votes: StakeAggregator<QuorumThreshold>,
512    // Accumulates reject votes per transaction in this block.
513    reject_txn_votes: BTreeMap<TransactionIndex, StakeAggregator<QuorumThreshold>>,
514    // Whether this block has been certified already.
515    is_certified: bool,
516}
517
518impl VoteInfo {
519    // If this block can now be certified, returns the output.
520    // Otherwise, returns None.
521    fn take_certified_output(&mut self, context: &Context) -> Option<CertifiedBlock> {
522        let committee = &context.committee;
523        if self.is_certified {
524            // Skip if already certified.
525            return None;
526        }
527        let Some(block) = self.block.as_ref() else {
528            // Skip if the content of the block has not been received.
529            return None;
530        };
531
532        let peer_hostname = &committee.authority(block.author()).hostname;
533
534        if !self.accept_block_votes.reached_threshold(committee) {
535            // Skip if the block is not certified.
536            return None;
537        }
538        let mut rejected = vec![];
539        for (idx, reject_txn_votes) in &self.reject_txn_votes {
540            // The transaction is voted to be rejected.
541            if reject_txn_votes.reached_threshold(committee) {
542                context
543                    .metrics
544                    .node_metrics
545                    .certifier_rejected_transactions
546                    .with_label_values(&[peer_hostname])
547                    .inc();
548                rejected.push(*idx);
549                continue;
550            }
551            // If a transaction does not have a quorum of accept votes minus the reject votes,
552            // it is neither rejected nor certified. In this case the whole block cannot
553            // be considered as certified.
554
555            // accept_block_votes can be < reject_txn_votes on the transaction when reject_txn_votes
556            // come from blocks more than 1 round higher, which do not add to the
557            // accept votes of the block.
558            //
559            // Also, the total accept votes of a transactions is undercounted here.
560            // If a block has accept votes from a quorum of authorities A, B and C, but one transaction
561            // has a reject vote from D, the transaction and block are technically certified
562            // and can be sent to fastpath. However, the computation here will not certify the transaction
563            // or the block. This is still fine because the fastpath certification is optional.
564            // The definite status of the transaction will be decided during post commit finalization.
565            if self
566                .accept_block_votes
567                .stake()
568                .saturating_sub(reject_txn_votes.stake())
569                < committee.quorum_threshold()
570            {
571                return None;
572            }
573        }
574        // The block is certified.
575        let accepted_txn_count = block.transactions().len().saturating_sub(rejected.len());
576        tracing::trace!(
577            "Certified block {} accepted tx count: {accepted_txn_count} & rejected txn count: {}",
578            block.reference(),
579            rejected.len()
580        );
581        context
582            .metrics
583            .node_metrics
584            .certifier_accepted_transactions
585            .with_label_values(&[peer_hostname])
586            .inc_by(accepted_txn_count as u64);
587        self.is_certified = true;
588        Some(CertifiedBlock {
589            block: block.clone(),
590            rejected,
591        })
592    }
593}
594
595impl Default for VoteInfo {
596    fn default() -> Self {
597        Self {
598            block: None,
599            own_reject_txn_votes: vec![],
600            accept_block_votes: StakeAggregator::new(),
601            reject_txn_votes: BTreeMap::new(),
602            is_certified: false,
603        }
604    }
605}
606
607#[cfg(test)]
608mod test {
609    use consensus_config::AuthorityIndex;
610    use itertools::Itertools;
611    use rand::seq::SliceRandom as _;
612
613    use crate::{
614        TestBlock, Transaction, block::BlockTransactionVotes, context::Context,
615        test_dag_builder::DagBuilder,
616    };
617
618    use super::*;
619
620    #[tokio::test]
621    async fn test_vote_info_basic() {
622        telemetry_subscribers::init_for_testing();
623        let (context, _) = Context::new_for_test(7);
624        let committee = &context.committee;
625
626        // No accept votes.
627        {
628            let mut vote_info = VoteInfo::default();
629            let block = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
630            vote_info.block = Some(block.clone());
631
632            assert!(vote_info.take_certified_output(&context).is_none());
633        }
634
635        // Accept votes but not enough.
636        {
637            let mut vote_info = VoteInfo::default();
638            let block = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
639            vote_info.block = Some(block.clone());
640            for i in 0..4 {
641                vote_info
642                    .accept_block_votes
643                    .add_unique(AuthorityIndex::new_for_test(i), committee);
644            }
645
646            assert!(vote_info.take_certified_output(&context).is_none());
647        }
648
649        // Enough accept votes but no block.
650        {
651            let mut vote_info = VoteInfo::default();
652            for i in 0..5 {
653                vote_info
654                    .accept_block_votes
655                    .add_unique(AuthorityIndex::new_for_test(i), committee);
656            }
657
658            assert!(vote_info.take_certified_output(&context).is_none());
659        }
660
661        // A quorum of accept votes and block exists.
662        {
663            let mut vote_info = VoteInfo::default();
664            let block = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
665            vote_info.block = Some(block.clone());
666            for i in 0..4 {
667                vote_info
668                    .accept_block_votes
669                    .add_unique(AuthorityIndex::new_for_test(i), committee);
670            }
671
672            // The block is not certified.
673            assert!(vote_info.take_certified_output(&context).is_none());
674
675            // Add 1 more accept vote from a different authority.
676            vote_info
677                .accept_block_votes
678                .add_unique(AuthorityIndex::new_for_test(4), committee);
679
680            // The block is now certified.
681            let certified_block = vote_info.take_certified_output(&context).unwrap();
682            assert_eq!(certified_block.block.reference(), block.reference());
683
684            // Certified block cannot be taken again.
685            assert!(vote_info.take_certified_output(&context).is_none());
686        }
687
688        // A quorum of accept and reject votes.
689        {
690            let mut vote_info = VoteInfo::default();
691            // Create a block with 7 transactions.
692            let block = VerifiedBlock::new_for_test(
693                TestBlock::new(1, 1)
694                    .set_transactions(vec![Transaction::new(vec![4; 8]); 7])
695                    .build(),
696            );
697            vote_info.block = Some(block.clone());
698            // Add 5 accept votes which form a quorum.
699            for i in 0..5 {
700                vote_info
701                    .accept_block_votes
702                    .add_unique(AuthorityIndex::new_for_test(i), committee);
703            }
704            // For transactions 3 - 7 ..
705            for reject_tx_idx in 3..8 {
706                vote_info
707                    .reject_txn_votes
708                    .insert(reject_tx_idx, StakeAggregator::new());
709                // .. add 5 reject votes which form a quorum.
710                for authority_idx in 0..5 {
711                    vote_info
712                        .reject_txn_votes
713                        .get_mut(&reject_tx_idx)
714                        .unwrap()
715                        .add_unique(AuthorityIndex::new_for_test(authority_idx), committee);
716                }
717            }
718
719            // The block is certified.
720            let certified_block = vote_info.take_certified_output(&context).unwrap();
721            assert_eq!(certified_block.block.reference(), block.reference());
722
723            // Certified block cannot be taken again.
724            assert!(vote_info.take_certified_output(&context).is_none());
725        }
726
727        // A transaction in the block is neither rejected nor certified.
728        {
729            let mut vote_info = VoteInfo::default();
730            // Create a block with 6 transactions.
731            let block = VerifiedBlock::new_for_test(
732                TestBlock::new(1, 1)
733                    .set_transactions(vec![Transaction::new(vec![4; 8]); 6])
734                    .build(),
735            );
736            vote_info.block = Some(block.clone());
737            // Add 5 accept votes which form a quorum.
738            for i in 0..5 {
739                vote_info
740                    .accept_block_votes
741                    .add_unique(AuthorityIndex::new_for_test(i), committee);
742            }
743            // For transactions 3 - 5 ..
744            for reject_tx_idx in 3..6 {
745                vote_info
746                    .reject_txn_votes
747                    .insert(reject_tx_idx, StakeAggregator::new());
748                // .. add 5 reject votes which form a quorum.
749                for authority_idx in 0..5 {
750                    vote_info
751                        .reject_txn_votes
752                        .get_mut(&reject_tx_idx)
753                        .unwrap()
754                        .add_unique(AuthorityIndex::new_for_test(authority_idx), committee);
755                }
756            }
757            // For transaction 6, add 4 reject votes which do not form a quorum.
758            vote_info.reject_txn_votes.insert(5, StakeAggregator::new());
759            for authority_idx in 0..4 {
760                vote_info
761                    .reject_txn_votes
762                    .get_mut(&5)
763                    .unwrap()
764                    .add_unique(AuthorityIndex::new_for_test(authority_idx), committee);
765            }
766
767            // The block is not certified.
768            assert!(vote_info.take_certified_output(&context).is_none());
769
770            // Add 1 more accept vote from a different authority for transaction 6.
771            vote_info
772                .reject_txn_votes
773                .get_mut(&5)
774                .unwrap()
775                .add_unique(AuthorityIndex::new_for_test(4), committee);
776
777            // The block is now certified.
778            let certified_block = vote_info.take_certified_output(&context).unwrap();
779            assert_eq!(certified_block.block.reference(), block.reference());
780
781            // Certified block cannot be taken again.
782            assert!(vote_info.take_certified_output(&context).is_none());
783        }
784    }
785
786    #[tokio::test]
787    async fn test_certify_basic() {
788        telemetry_subscribers::init_for_testing();
789        let (context, _) = Context::new_for_test(4);
790        let context = Arc::new(context);
791
792        // GIVEN: Round 1: blocks from all authorities are fully connected to the genesis blocks.
793        let mut dag_builder = DagBuilder::new(context.clone());
794        dag_builder.layer(1).num_transactions(4).build();
795        let round_1_blocks = dag_builder.all_blocks();
796        let mut all_blocks = round_1_blocks.clone();
797
798        // THEN: Round 1: no block can be certified yet.
799        let mut certifier = CertifierState::new(context.clone());
800        let certified_blocks = certifier
801            .add_voted_blocks(round_1_blocks.iter().map(|b| (b.clone(), vec![])).collect());
802        assert!(certified_blocks.is_empty());
803
804        // GIVEN: Round 2: A, B & C blocks at round 2 are connected to only A, B & C blocks at round 1.
805        // AND: A & B blocks reject transaction 1 from the round 1 B block.
806        // AND: A, B & C blocks reject transaction 2 from the round 1 C block.
807        let transactions = (0..4)
808            .map(|_| Transaction::new(vec![0_u8; 16]))
809            .collect::<Vec<_>>();
810        let ancestors = round_1_blocks
811            .iter()
812            .filter_map(|b| {
813                if b.author().value() < 3 {
814                    Some(b.reference())
815                } else {
816                    None
817                }
818            })
819            .collect::<Vec<_>>();
820        for author in 0..3 {
821            let mut block = TestBlock::new(2, author)
822                .set_ancestors(ancestors.clone())
823                .set_transactions(transactions.clone());
824            let mut votes = vec![];
825            for i in 0..(3 - author) {
826                let j = author + i;
827                if j == 0 {
828                    // Do not reject transaction 0 from the round 1 A block.
829                    continue;
830                }
831                votes.push(BlockTransactionVotes {
832                    block_ref: round_1_blocks[j as usize].reference(),
833                    rejects: vec![j as u16],
834                });
835            }
836            block = block.set_transaction_votes(votes);
837            all_blocks.push(VerifiedBlock::new_for_test(block.build()));
838        }
839
840        // THEN: Round 2: no block can be certified yet.
841        let mut certifier = CertifierState::new(context.clone());
842        let certified_blocks =
843            certifier.add_voted_blocks(all_blocks.iter().map(|b| (b.clone(), vec![])).collect());
844        assert!(certified_blocks.is_empty());
845
846        // GIVEN: Round 3: all blocks connected to round 2 blocks and round 1 D block,
847        let ancestors = all_blocks
848            .iter()
849            .filter_map(|b| {
850                if b.round() == 1 && b.author().value() == 3 {
851                    Some(b.reference())
852                } else if b.round() == 2 {
853                    assert_ne!(b.author().value(), 3);
854                    Some(b.reference())
855                } else {
856                    None
857                }
858            })
859            .collect::<Vec<_>>();
860        assert_eq!(ancestors.len(), 4, "Ancestors {:?}", ancestors);
861        let mut round_3_blocks = vec![];
862        for author in 0..4 {
863            let block = TestBlock::new(3, author)
864                .set_ancestors(ancestors.clone())
865                .set_transactions(transactions.clone());
866            round_3_blocks.push(VerifiedBlock::new_for_test(block.build()));
867        }
868
869        // THEN: Round 3: with 1 round 3 block, A & C round 1 blocks are certified.
870        let mut certifier = CertifierState::new(context.clone());
871        certifier.add_voted_blocks(all_blocks.iter().map(|b| (b.clone(), vec![])).collect());
872        let proposed_block = round_3_blocks.pop().unwrap();
873        let mut certified_blocks =
874            certifier.add_voted_blocks(vec![(proposed_block.clone(), vec![])]);
875        certified_blocks.extend(certifier.add_proposed_block(proposed_block));
876        assert_eq!(
877            certified_blocks.len(),
878            2,
879            "Certified blocks {:?}",
880            certified_blocks
881                .iter()
882                .map(|b| b.block.reference().to_string())
883                .join(",")
884        );
885        assert_eq!(
886            certified_blocks[0].block.reference(),
887            round_1_blocks[0].reference()
888        );
889        assert!(certified_blocks[0].rejected.is_empty());
890        assert_eq!(
891            certified_blocks[1].block.reference(),
892            round_1_blocks[2].reference()
893        );
894        assert_eq!(certified_blocks[1].rejected, vec![2]);
895    }
896
897    // TODO: add reject votes.
898    #[tokio::test]
899    async fn test_certify_randomized() {
900        telemetry_subscribers::init_for_testing();
901        let num_authorities: u32 = 7;
902        let (context, _) = Context::new_for_test(num_authorities as usize);
903        let context = Arc::new(context);
904
905        // Create minimal connected blocks up to num_rounds.
906        let num_rounds = 50;
907        let mut dag_builder = DagBuilder::new(context.clone());
908        dag_builder
909            .layers(1..=num_rounds)
910            .min_ancestor_links(false, None)
911            .build();
912        let all_blocks = dag_builder.all_blocks();
913
914        // Get the certified blocks, which depends on the structure of the minimum connected DAG.
915        let mut certifier = CertifierState::new(context.clone());
916        let mut expected_certified_blocks =
917            certifier.add_voted_blocks(all_blocks.iter().map(|b| (b.clone(), vec![])).collect());
918        expected_certified_blocks.sort_by_key(|b| b.block.reference());
919
920        // Adding all blocks to certifier in random order should still produce the same set of certified blocks.
921        for _ in 0..100 {
922            // Add the blocks to certifier in random order.
923            let mut all_blocks = all_blocks.clone();
924            all_blocks.shuffle(&mut rand::thread_rng());
925            let mut certifier = CertifierState::new(context.clone());
926
927            // Take the certified blocks.
928            let mut actual_certified_blocks = certifier
929                .add_voted_blocks(all_blocks.iter().map(|b| (b.clone(), vec![])).collect());
930            actual_certified_blocks.sort_by_key(|b| b.block.reference());
931
932            // Ensure the certified blocks are the expected ones.
933            assert_eq!(
934                actual_certified_blocks.len(),
935                expected_certified_blocks.len()
936            );
937            for (actual, expected) in actual_certified_blocks
938                .iter()
939                .zip(expected_certified_blocks.iter())
940            {
941                assert_eq!(actual.block.reference(), expected.block.reference());
942                assert_eq!(actual.rejected, expected.rejected);
943            }
944        }
945    }
946}