consensus_core/
transaction_certifier.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::BTreeMap, sync::Arc, time::Duration};
5
6use consensus_config::Stake;
7use consensus_types::block::{BlockRef, Round, TransactionIndex};
8use mysten_metrics::monitored_mpsc::UnboundedSender;
9use parking_lot::RwLock;
10use tracing::{debug, info};
11
12use crate::{
13    BlockAPI as _, CertifiedBlock, CertifiedBlocksOutput, VerifiedBlock,
14    block::{BlockTransactionVotes, GENESIS_ROUND},
15    block_verifier::BlockVerifier,
16    context::Context,
17    dag_state::DagState,
18    stake_aggregator::{QuorumThreshold, StakeAggregator},
19};
20
21/// TransactionCertifier has the following purposes:
22/// 1. Certifies transactions and sends them to execute on the fastpath.
23/// 2. Keeps track of own votes on transactions, and allows the votes to be retrieved
24///    later in core after acceptance of the blocks containing the transactions.
25/// 3. Aggregates reject votes on transactions, and allows the aggregated votes
26///    to be retrieved during post-commit finalization.
27///
28/// A transaction is certified if a quorum of authorities in the causal history of a proposed block
29/// vote to accept the transaction. Accept votes are implicit in blocks: if a transaction is in
30/// the causal history of a block and the block does not vote to reject it, the block
31/// is considered to vote to accept the transaction. Transaction finalization are eventually resolved
32/// post commit, by checking if there is a certification of the transaction in the causal history
33/// of the leader. So only accept votes are only considered if they are in the causal history of own
34/// proposed blocks.
35///
36/// A transaction is rejected if a quorum of authorities vote to reject it. When this happens, it is
37/// guaranteed that no validator can observe a certification of the transaction, with <= f malicious
38/// stake.
39///
40/// A block is certified if every transaction in the block is either certified or rejected.
41/// TransactionCertifier outputs certified blocks.
42///
43/// The invariant between TransactionCertifier and post-commit finalization is that if a quorum of
44/// authorities certified a transaction for fastpath and executed it, then the transaction
45/// must also be finalized post consensus commit. The reverse is not true though, because
46/// fastpath execution is only a latency optimization, and not required for correctness.
47#[derive(Clone)]
48pub struct TransactionCertifier {
49    // The state of blocks being voted on and certified.
50    certifier_state: Arc<RwLock<CertifierState>>,
51    // Verify transactions during recovery.
52    block_verifier: Arc<dyn BlockVerifier>,
53    // The state of the DAG.
54    dag_state: Arc<RwLock<DagState>>,
55    // An unbounded channel to output certified blocks to Sui consensus block handler.
56    certified_blocks_sender: UnboundedSender<CertifiedBlocksOutput>,
57}
58
59impl TransactionCertifier {
60    pub fn new(
61        context: Arc<Context>,
62        block_verifier: Arc<dyn BlockVerifier>,
63        dag_state: Arc<RwLock<DagState>>,
64        certified_blocks_sender: UnboundedSender<CertifiedBlocksOutput>,
65    ) -> Self {
66        Self {
67            certifier_state: Arc::new(RwLock::new(CertifierState::new(context))),
68            block_verifier,
69            dag_state,
70            certified_blocks_sender,
71        }
72    }
73
74    /// Recovers all blocks from DB after the given round.
75    ///
76    /// This is useful for initializing the certifier state
77    /// for future commits and block proposals.
78    pub(crate) fn recover_blocks_after_round(&self, after_round: Round) {
79        let context = self.certifier_state.read().context.clone();
80        if !context.protocol_config.mysticeti_fastpath() {
81            info!("Skipping certifier recovery in non-mysticeti fast path mode");
82            return;
83        }
84
85        let store = self.dag_state.read().store().clone();
86
87        let recovery_start_round = after_round + 1;
88        info!(
89            "Recovering certifier state from round {}",
90            recovery_start_round,
91        );
92
93        let authorities = context
94            .committee
95            .authorities()
96            .map(|(index, _)| index)
97            .collect::<Vec<_>>();
98        for authority_index in authorities {
99            let blocks = store
100                .scan_blocks_by_author(authority_index, recovery_start_round)
101                .unwrap();
102            info!(
103                "Recovered and voting on {} blocks from authority {} {}",
104                blocks.len(),
105                authority_index,
106                context.committee.authority(authority_index).hostname
107            );
108            self.recover_and_vote_on_blocks(blocks);
109        }
110    }
111
112    /// Recovers and potentially votes on the given blocks.
113    ///
114    /// Because own votes on blocks are not stored, during recovery it is necessary to vote on
115    /// input blocks that are above GC round and have not been included before, which can be
116    /// included in a future proposed block.
117    ///
118    /// In addition, add_voted_blocks() will eventually process reject votes contained in the input blocks.
119    pub(crate) fn recover_and_vote_on_blocks(&self, blocks: Vec<VerifiedBlock>) {
120        let should_vote_blocks = {
121            let dag_state = self.dag_state.read();
122            let gc_round = dag_state.gc_round();
123            blocks
124                .iter()
125                // Must make sure the block is above GC round before calling has_been_included().
126                .map(|b| b.round() > gc_round && !dag_state.has_been_included(&b.reference()))
127                .collect::<Vec<_>>()
128        };
129        let voted_blocks = blocks
130            .into_iter()
131            .zip(should_vote_blocks)
132            .map(|(b, should_vote)| {
133                if !should_vote {
134                    // Voting is unnecessary for blocks already included in own proposed blocks,
135                    // or outside of local DAG GC bound.
136                    (b, vec![])
137                } else {
138                    // Voting is needed for blocks above GC round and not yet included in own proposed blocks.
139                    // A block proposal can include the input block later and retries own votes on it.
140                    let reject_transaction_votes =
141                        self.block_verifier.vote(&b).unwrap_or_else(|e| {
142                            panic!("Failed to vote on block during recovery: {}", e)
143                        });
144                    (b, reject_transaction_votes)
145                }
146            })
147            .collect::<Vec<_>>();
148        self.certifier_state.write().add_voted_blocks(voted_blocks);
149        // Do not send certified blocks to the fastpath output channel during recovery,
150        // because these transactions could have been executed and fastpath latency optimization is
151        // unnecessary for recovered transactions.
152    }
153
154    /// Stores own reject votes on input blocks, and aggregates reject votes from the input blocks.
155    /// Newly certified blocks are sent to the fastpath output channel.
156    pub fn add_voted_blocks(&self, voted_blocks: Vec<(VerifiedBlock, Vec<TransactionIndex>)>) {
157        let certified_blocks = self.certifier_state.write().add_voted_blocks(voted_blocks);
158        self.send_certified_blocks(certified_blocks);
159    }
160
161    /// Aggregates accept votes from the own proposed block.
162    /// Newly certified blocks are sent to the fastpath output channel.
163    pub(crate) fn add_proposed_block(&self, proposed_block: VerifiedBlock) {
164        let certified_blocks = self
165            .certifier_state
166            .write()
167            .add_proposed_block(proposed_block);
168        self.send_certified_blocks(certified_blocks);
169    }
170
171    // Sends certified blocks to the fastpath output channel.
172    fn send_certified_blocks(&self, certified_blocks: Vec<CertifiedBlock>) {
173        if certified_blocks.is_empty() {
174            return;
175        }
176        if let Err(e) = self.certified_blocks_sender.send(CertifiedBlocksOutput {
177            blocks: certified_blocks,
178        }) {
179            tracing::warn!("Failed to send certified blocks: {:?}", e);
180        }
181    }
182
183    /// Retrieves own votes on peer block transactions.
184    pub(crate) fn get_own_votes(&self, block_refs: Vec<BlockRef>) -> Vec<BlockTransactionVotes> {
185        let mut votes = vec![];
186        let certifier_state = self.certifier_state.read();
187        for block_ref in block_refs {
188            if block_ref.round <= certifier_state.gc_round {
189                continue;
190            }
191            let vote_info = certifier_state.votes.get(&block_ref).unwrap_or_else(|| {
192                panic!("Ancestor block {} not found in certifier state", block_ref)
193            });
194            if !vote_info.own_reject_txn_votes.is_empty() {
195                votes.push(BlockTransactionVotes {
196                    block_ref,
197                    rejects: vote_info.own_reject_txn_votes.clone(),
198                });
199            }
200        }
201        votes
202    }
203
204    /// Retrieves transactions in the block that have received reject votes, and the total stake of the votes.
205    /// TransactionIndex not included in the output has no reject votes.
206    /// Returns None if no information is found for the block.
207    pub(crate) fn get_reject_votes(
208        &self,
209        block_ref: &BlockRef,
210    ) -> Option<Vec<(TransactionIndex, Stake)>> {
211        let accumulated_reject_votes = self
212            .certifier_state
213            .read()
214            .votes
215            .get(block_ref)?
216            .reject_txn_votes
217            .iter()
218            .map(|(idx, stake_agg)| (*idx, stake_agg.stake()))
219            .collect::<Vec<_>>();
220        Some(accumulated_reject_votes)
221    }
222
223    /// Runs garbage collection on the internal state by removing data for blocks <= gc_round,
224    /// and updates the GC round for the certifier.
225    ///
226    /// IMPORTANT: the gc_round used here can trail the latest gc_round from DagState.
227    /// This is because the gc round here is determined by CommitFinalizer, which needs to process
228    /// commits before the latest commit in DagState. Reject votes received by transactions below
229    /// local DAG gc_round may still need to be accessed from CommitFinalizer.
230    pub(crate) fn run_gc(&self, gc_round: Round) {
231        let dag_state_gc_round = self.dag_state.read().gc_round();
232        assert!(
233            gc_round <= dag_state_gc_round,
234            "TransactionCertifier cannot GC higher than DagState GC round ({} > {})",
235            gc_round,
236            dag_state_gc_round
237        );
238        self.certifier_state.write().update_gc_round(gc_round);
239    }
240}
241
242/// CertifierState keeps track of votes received by each transaction and block,
243/// and helps determine if votes reach a quorum. Reject votes can start accumulating
244/// even before the target block is received by this authority.
245struct CertifierState {
246    context: Arc<Context>,
247
248    // Maps received blocks' refs to votes on those blocks from other blocks.
249    // Even if a block has no reject votes on its transactions, it still has an entry here.
250    votes: BTreeMap<BlockRef, VoteInfo>,
251
252    // Highest round where blocks are GC'ed.
253    gc_round: Round,
254}
255
256impl CertifierState {
257    fn new(context: Arc<Context>) -> Self {
258        Self {
259            context,
260            votes: BTreeMap::new(),
261            gc_round: GENESIS_ROUND,
262        }
263    }
264
265    fn add_voted_blocks(
266        &mut self,
267        voted_blocks: Vec<(VerifiedBlock, Vec<TransactionIndex>)>,
268    ) -> Vec<CertifiedBlock> {
269        let mut certified_blocks = vec![];
270        for (voted_block, reject_txn_votes) in voted_blocks {
271            let blocks = self.add_voted_block(voted_block, reject_txn_votes);
272            certified_blocks.extend(blocks);
273        }
274
275        if !certified_blocks.is_empty() {
276            self.context
277                .metrics
278                .node_metrics
279                .certifier_output_blocks
280                .with_label_values(&["voted"])
281                .inc_by(certified_blocks.len() as u64);
282        }
283
284        certified_blocks
285    }
286
287    fn add_voted_block(
288        &mut self,
289        voted_block: VerifiedBlock,
290        reject_txn_votes: Vec<TransactionIndex>,
291    ) -> Vec<CertifiedBlock> {
292        if voted_block.round() <= self.gc_round {
293            // Ignore the block and own votes, since they are outside of certifier GC bound.
294            return vec![];
295        }
296
297        // Count own reject votes against each peer authority.
298        let peer_hostname = &self
299            .context
300            .committee
301            .authority(voted_block.author())
302            .hostname;
303        self.context
304            .metrics
305            .node_metrics
306            .certifier_own_reject_votes
307            .with_label_values(&[peer_hostname])
308            .inc_by(reject_txn_votes.len() as u64);
309
310        // Initialize the entry for the voted block.
311        let vote_info = self.votes.entry(voted_block.reference()).or_default();
312        if vote_info.block.is_some() {
313            // Input block has already been processed and added to the state.
314            return vec![];
315        }
316        vote_info.block = Some(voted_block.clone());
317        vote_info.own_reject_txn_votes = reject_txn_votes;
318
319        let mut certified_blocks = vec![];
320
321        let now = self.context.clock.timestamp_utc_ms();
322
323        // Update reject votes from the input block.
324        for block_votes in voted_block.transaction_votes() {
325            if block_votes.block_ref.round <= self.gc_round {
326                // Block is outside of GC bound.
327                continue;
328            }
329            let vote_info = self.votes.entry(block_votes.block_ref).or_default();
330            for reject in &block_votes.rejects {
331                vote_info
332                    .reject_txn_votes
333                    .entry(*reject)
334                    .or_default()
335                    .add_unique(voted_block.author(), &self.context.committee);
336            }
337            // Check if the target block is now certified after including the reject votes.
338            // NOTE: votes can already exist for the target block and its transactions.
339            if let Some(certified_block) = vote_info.take_certified_output(&self.context) {
340                let authority_name = self
341                    .context
342                    .committee
343                    .authority(certified_block.block.author())
344                    .hostname
345                    .clone();
346                self.context
347                    .metrics
348                    .node_metrics
349                    .certifier_block_latency
350                    .with_label_values(&[&authority_name])
351                    .observe(
352                        Duration::from_millis(
353                            now.saturating_sub(certified_block.block.timestamp_ms()),
354                        )
355                        .as_secs_f64(),
356                    );
357                certified_blocks.push(certified_block);
358            }
359        }
360
361        certified_blocks
362    }
363
364    fn add_proposed_block(&mut self, proposed_block: VerifiedBlock) -> Vec<CertifiedBlock> {
365        if proposed_block.round() <= self.gc_round + 2 {
366            // Skip if transactions that can be certified have already been GC'ed.
367            // Skip also when the proposed block has been GC'ed from the certifier state.
368            // This is possible because this function (add_proposed_block()) is async from
369            // commit finalization, which advances the GC round of the certifier.
370            return vec![];
371        }
372        debug!(
373            "Adding proposed block {}; gc round: {}",
374            proposed_block.reference(),
375            self.gc_round
376        );
377
378        if !self.votes.contains_key(&proposed_block.reference()) {
379            self.context
380                .metrics
381                .node_metrics
382                .certifier_missing_ancestor_during_certification
383                .with_label_values(&["proposed_block_not_found"])
384                .inc();
385            debug!(
386                "Proposed block {} not found in certifier state. GC round: {}",
387                proposed_block.reference(),
388                self.gc_round,
389            );
390            return vec![];
391        }
392
393        let now = self.context.clock.timestamp_utc_ms();
394
395        // Certify transactions based on the accept votes from the proposed block's parents.
396        // Some ancestor blocks may not be found, because either they have been GC'ed due to timing
397        // issues or they were not recovered. It is ok to skip certifying blocks, which are best effort.
398        let mut certified_blocks = vec![];
399        for voting_ancestor in proposed_block.ancestors() {
400            // Votes are limited to 1 round before the proposed block.
401            if voting_ancestor.round + 1 != proposed_block.round() {
402                continue;
403            }
404            let Some(voting_info) = self.votes.get(voting_ancestor) else {
405                self.context
406                    .metrics
407                    .node_metrics
408                    .certifier_missing_ancestor_during_certification
409                    .with_label_values(&["voting_info_not_found"])
410                    .inc();
411                debug!(
412                    "Proposed block {}: voting info not found for ancestor {}",
413                    proposed_block.reference(),
414                    voting_ancestor
415                );
416                continue;
417            };
418            let Some(voting_block) = voting_info.block.clone() else {
419                self.context
420                    .metrics
421                    .node_metrics
422                    .certifier_missing_ancestor_during_certification
423                    .with_label_values(&["voting_block_not_found"])
424                    .inc();
425                debug!(
426                    "Proposed block {}: voting block not found for ancestor {}",
427                    proposed_block.reference(),
428                    voting_ancestor
429                );
430                continue;
431            };
432            for target_ancestor in voting_block.ancestors() {
433                // Target blocks are 1 round before the voting block.
434                if target_ancestor.round + 1 != voting_block.round() {
435                    continue;
436                }
437                let Some(target_vote_info) = self.votes.get_mut(target_ancestor) else {
438                    self.context
439                        .metrics
440                        .node_metrics
441                        .certifier_missing_ancestor_during_certification
442                        .with_label_values(&["target_vote_info_not_found"])
443                        .inc();
444                    debug!(
445                        "Proposed block {}: target voting info not found for ancestor {}",
446                        proposed_block.reference(),
447                        target_ancestor
448                    );
449                    continue;
450                };
451                target_vote_info
452                    .accept_block_votes
453                    .add_unique(voting_block.author(), &self.context.committee);
454                // Check if the target block is now certified after including the accept votes.
455                if let Some(certified_block) = target_vote_info.take_certified_output(&self.context)
456                {
457                    let authority_name = self
458                        .context
459                        .committee
460                        .authority(certified_block.block.author())
461                        .hostname
462                        .clone();
463                    self.context
464                        .metrics
465                        .node_metrics
466                        .certifier_block_latency
467                        .with_label_values(&[&authority_name])
468                        .observe(
469                            Duration::from_millis(
470                                now.saturating_sub(certified_block.block.timestamp_ms()),
471                            )
472                            .as_secs_f64(),
473                        );
474                    certified_blocks.push(certified_block);
475                }
476            }
477        }
478
479        if !certified_blocks.is_empty() {
480            self.context
481                .metrics
482                .node_metrics
483                .certifier_output_blocks
484                .with_label_values(&["proposed"])
485                .inc_by(certified_blocks.len() as u64);
486        }
487
488        certified_blocks
489    }
490
491    /// Updates the GC round and cleans up obsolete internal state.
492    fn update_gc_round(&mut self, gc_round: Round) {
493        self.gc_round = gc_round;
494        while let Some((block_ref, _)) = self.votes.first_key_value() {
495            if block_ref.round <= self.gc_round {
496                self.votes.pop_first();
497            } else {
498                break;
499            }
500        }
501
502        self.context
503            .metrics
504            .node_metrics
505            .certifier_gc_round
506            .set(self.gc_round as i64);
507    }
508}
509
510/// VoteInfo keeps track of votes received for each transaction of this block,
511/// possibly even before the block is received by this authority.
512struct VoteInfo {
513    // Content of the block.
514    // None if the blocks has not been received.
515    block: Option<VerifiedBlock>,
516    // Rejection votes by this authority on this block.
517    // This field is written when the block is first received and its transactions are voted on.
518    // It is read from core after the block is accepted.
519    own_reject_txn_votes: Vec<TransactionIndex>,
520    // Accumulates implicit accept votes for the block and all transactions.
521    accept_block_votes: StakeAggregator<QuorumThreshold>,
522    // Accumulates reject votes per transaction in this block.
523    reject_txn_votes: BTreeMap<TransactionIndex, StakeAggregator<QuorumThreshold>>,
524    // Whether this block has been certified already.
525    is_certified: bool,
526}
527
528impl VoteInfo {
529    // If this block can now be certified, returns the output.
530    // Otherwise, returns None.
531    fn take_certified_output(&mut self, context: &Context) -> Option<CertifiedBlock> {
532        let committee = &context.committee;
533        if self.is_certified {
534            // Skip if already certified.
535            return None;
536        }
537        let Some(block) = self.block.as_ref() else {
538            // Skip if the content of the block has not been received.
539            return None;
540        };
541
542        let peer_hostname = &committee.authority(block.author()).hostname;
543
544        if !self.accept_block_votes.reached_threshold(committee) {
545            // Skip if the block is not certified.
546            return None;
547        }
548        let mut rejected = vec![];
549        for (idx, reject_txn_votes) in &self.reject_txn_votes {
550            // The transaction is voted to be rejected.
551            if reject_txn_votes.reached_threshold(committee) {
552                context
553                    .metrics
554                    .node_metrics
555                    .certifier_rejected_transactions
556                    .with_label_values(&[peer_hostname])
557                    .inc();
558                rejected.push(*idx);
559                continue;
560            }
561            // If a transaction does not have a quorum of accept votes minus the reject votes,
562            // it is neither rejected nor certified. In this case the whole block cannot
563            // be considered as certified.
564
565            // accept_block_votes can be < reject_txn_votes on the transaction when reject_txn_votes
566            // come from blocks more than 1 round higher, which do not add to the
567            // accept votes of the block.
568            //
569            // Also, the total accept votes of a transactions is undercounted here.
570            // If a block has accept votes from a quorum of authorities A, B and C, but one transaction
571            // has a reject vote from D, the transaction and block are technically certified
572            // and can be sent to fastpath. However, the computation here will not certify the transaction
573            // or the block. This is still fine because the fastpath certification is optional.
574            // The definite status of the transaction will be decided during post commit finalization.
575            if self
576                .accept_block_votes
577                .stake()
578                .saturating_sub(reject_txn_votes.stake())
579                < committee.quorum_threshold()
580            {
581                return None;
582            }
583        }
584        // The block is certified.
585        let accepted_txn_count = block.transactions().len().saturating_sub(rejected.len());
586        tracing::trace!(
587            "Certified block {} accepted tx count: {accepted_txn_count} & rejected txn count: {}",
588            block.reference(),
589            rejected.len()
590        );
591        context
592            .metrics
593            .node_metrics
594            .certifier_accepted_transactions
595            .with_label_values(&[peer_hostname])
596            .inc_by(accepted_txn_count as u64);
597        self.is_certified = true;
598        Some(CertifiedBlock {
599            block: block.clone(),
600            rejected,
601        })
602    }
603}
604
605impl Default for VoteInfo {
606    fn default() -> Self {
607        Self {
608            block: None,
609            own_reject_txn_votes: vec![],
610            accept_block_votes: StakeAggregator::new(),
611            reject_txn_votes: BTreeMap::new(),
612            is_certified: false,
613        }
614    }
615}
616
617#[cfg(test)]
618mod test {
619    use consensus_config::AuthorityIndex;
620    use itertools::Itertools;
621    use rand::seq::SliceRandom as _;
622
623    use crate::{
624        TestBlock, Transaction, block::BlockTransactionVotes, context::Context,
625        test_dag_builder::DagBuilder,
626    };
627
628    use super::*;
629
630    #[tokio::test]
631    async fn test_vote_info_basic() {
632        telemetry_subscribers::init_for_testing();
633        let (context, _) = Context::new_for_test(7);
634        let committee = &context.committee;
635
636        // No accept votes.
637        {
638            let mut vote_info = VoteInfo::default();
639            let block = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
640            vote_info.block = Some(block.clone());
641
642            assert!(vote_info.take_certified_output(&context).is_none());
643        }
644
645        // Accept votes but not enough.
646        {
647            let mut vote_info = VoteInfo::default();
648            let block = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
649            vote_info.block = Some(block.clone());
650            for i in 0..4 {
651                vote_info
652                    .accept_block_votes
653                    .add_unique(AuthorityIndex::new_for_test(i), committee);
654            }
655
656            assert!(vote_info.take_certified_output(&context).is_none());
657        }
658
659        // Enough accept votes but no block.
660        {
661            let mut vote_info = VoteInfo::default();
662            for i in 0..5 {
663                vote_info
664                    .accept_block_votes
665                    .add_unique(AuthorityIndex::new_for_test(i), committee);
666            }
667
668            assert!(vote_info.take_certified_output(&context).is_none());
669        }
670
671        // A quorum of accept votes and block exists.
672        {
673            let mut vote_info = VoteInfo::default();
674            let block = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
675            vote_info.block = Some(block.clone());
676            for i in 0..4 {
677                vote_info
678                    .accept_block_votes
679                    .add_unique(AuthorityIndex::new_for_test(i), committee);
680            }
681
682            // The block is not certified.
683            assert!(vote_info.take_certified_output(&context).is_none());
684
685            // Add 1 more accept vote from a different authority.
686            vote_info
687                .accept_block_votes
688                .add_unique(AuthorityIndex::new_for_test(4), committee);
689
690            // The block is now certified.
691            let certified_block = vote_info.take_certified_output(&context).unwrap();
692            assert_eq!(certified_block.block.reference(), block.reference());
693
694            // Certified block cannot be taken again.
695            assert!(vote_info.take_certified_output(&context).is_none());
696        }
697
698        // A quorum of accept and reject votes.
699        {
700            let mut vote_info = VoteInfo::default();
701            // Create a block with 7 transactions.
702            let block = VerifiedBlock::new_for_test(
703                TestBlock::new(1, 1)
704                    .set_transactions(vec![Transaction::new(vec![4; 8]); 7])
705                    .build(),
706            );
707            vote_info.block = Some(block.clone());
708            // Add 5 accept votes which form a quorum.
709            for i in 0..5 {
710                vote_info
711                    .accept_block_votes
712                    .add_unique(AuthorityIndex::new_for_test(i), committee);
713            }
714            // For transactions 3 - 7 ..
715            for reject_tx_idx in 3..8 {
716                vote_info
717                    .reject_txn_votes
718                    .insert(reject_tx_idx, StakeAggregator::new());
719                // .. add 5 reject votes which form a quorum.
720                for authority_idx in 0..5 {
721                    vote_info
722                        .reject_txn_votes
723                        .get_mut(&reject_tx_idx)
724                        .unwrap()
725                        .add_unique(AuthorityIndex::new_for_test(authority_idx), committee);
726                }
727            }
728
729            // The block is certified.
730            let certified_block = vote_info.take_certified_output(&context).unwrap();
731            assert_eq!(certified_block.block.reference(), block.reference());
732
733            // Certified block cannot be taken again.
734            assert!(vote_info.take_certified_output(&context).is_none());
735        }
736
737        // A transaction in the block is neither rejected nor certified.
738        {
739            let mut vote_info = VoteInfo::default();
740            // Create a block with 6 transactions.
741            let block = VerifiedBlock::new_for_test(
742                TestBlock::new(1, 1)
743                    .set_transactions(vec![Transaction::new(vec![4; 8]); 6])
744                    .build(),
745            );
746            vote_info.block = Some(block.clone());
747            // Add 5 accept votes which form a quorum.
748            for i in 0..5 {
749                vote_info
750                    .accept_block_votes
751                    .add_unique(AuthorityIndex::new_for_test(i), committee);
752            }
753            // For transactions 3 - 5 ..
754            for reject_tx_idx in 3..6 {
755                vote_info
756                    .reject_txn_votes
757                    .insert(reject_tx_idx, StakeAggregator::new());
758                // .. add 5 reject votes which form a quorum.
759                for authority_idx in 0..5 {
760                    vote_info
761                        .reject_txn_votes
762                        .get_mut(&reject_tx_idx)
763                        .unwrap()
764                        .add_unique(AuthorityIndex::new_for_test(authority_idx), committee);
765                }
766            }
767            // For transaction 6, add 4 reject votes which do not form a quorum.
768            vote_info.reject_txn_votes.insert(5, StakeAggregator::new());
769            for authority_idx in 0..4 {
770                vote_info
771                    .reject_txn_votes
772                    .get_mut(&5)
773                    .unwrap()
774                    .add_unique(AuthorityIndex::new_for_test(authority_idx), committee);
775            }
776
777            // The block is not certified.
778            assert!(vote_info.take_certified_output(&context).is_none());
779
780            // Add 1 more accept vote from a different authority for transaction 6.
781            vote_info
782                .reject_txn_votes
783                .get_mut(&5)
784                .unwrap()
785                .add_unique(AuthorityIndex::new_for_test(4), committee);
786
787            // The block is now certified.
788            let certified_block = vote_info.take_certified_output(&context).unwrap();
789            assert_eq!(certified_block.block.reference(), block.reference());
790
791            // Certified block cannot be taken again.
792            assert!(vote_info.take_certified_output(&context).is_none());
793        }
794    }
795
796    #[tokio::test]
797    async fn test_certify_basic() {
798        telemetry_subscribers::init_for_testing();
799        let (context, _) = Context::new_for_test(4);
800        let context = Arc::new(context);
801
802        // GIVEN: Round 1: blocks from all authorities are fully connected to the genesis blocks.
803        let mut dag_builder = DagBuilder::new(context.clone());
804        dag_builder.layer(1).num_transactions(4).build();
805        let round_1_blocks = dag_builder.all_blocks();
806        let mut all_blocks = round_1_blocks.clone();
807
808        // THEN: Round 1: no block can be certified yet.
809        let mut certifier = CertifierState::new(context.clone());
810        let certified_blocks = certifier
811            .add_voted_blocks(round_1_blocks.iter().map(|b| (b.clone(), vec![])).collect());
812        assert!(certified_blocks.is_empty());
813
814        // GIVEN: Round 2: A, B & C blocks at round 2 are connected to only A, B & C blocks at round 1.
815        // AND: A & B blocks reject transaction 1 from the round 1 B block.
816        // AND: A, B & C blocks reject transaction 2 from the round 1 C block.
817        let transactions = (0..4)
818            .map(|_| Transaction::new(vec![0_u8; 16]))
819            .collect::<Vec<_>>();
820        let ancestors = round_1_blocks
821            .iter()
822            .filter_map(|b| {
823                if b.author().value() < 3 {
824                    Some(b.reference())
825                } else {
826                    None
827                }
828            })
829            .collect::<Vec<_>>();
830        for author in 0..3 {
831            let mut block = TestBlock::new(2, author)
832                .set_ancestors(ancestors.clone())
833                .set_transactions(transactions.clone());
834            let mut votes = vec![];
835            for i in 0..(3 - author) {
836                let j = author + i;
837                if j == 0 {
838                    // Do not reject transaction 0 from the round 1 A block.
839                    continue;
840                }
841                votes.push(BlockTransactionVotes {
842                    block_ref: round_1_blocks[j as usize].reference(),
843                    rejects: vec![j as u16],
844                });
845            }
846            block = block.set_transaction_votes(votes);
847            all_blocks.push(VerifiedBlock::new_for_test(block.build()));
848        }
849
850        // THEN: Round 2: no block can be certified yet.
851        let mut certifier = CertifierState::new(context.clone());
852        let certified_blocks =
853            certifier.add_voted_blocks(all_blocks.iter().map(|b| (b.clone(), vec![])).collect());
854        assert!(certified_blocks.is_empty());
855
856        // GIVEN: Round 3: all blocks connected to round 2 blocks and round 1 D block,
857        let ancestors = all_blocks
858            .iter()
859            .filter_map(|b| {
860                if b.round() == 1 && b.author().value() == 3 {
861                    Some(b.reference())
862                } else if b.round() == 2 {
863                    assert_ne!(b.author().value(), 3);
864                    Some(b.reference())
865                } else {
866                    None
867                }
868            })
869            .collect::<Vec<_>>();
870        assert_eq!(ancestors.len(), 4, "Ancestors {:?}", ancestors);
871        let mut round_3_blocks = vec![];
872        for author in 0..4 {
873            let block = TestBlock::new(3, author)
874                .set_ancestors(ancestors.clone())
875                .set_transactions(transactions.clone());
876            round_3_blocks.push(VerifiedBlock::new_for_test(block.build()));
877        }
878
879        // THEN: Round 3: with 1 round 3 block, A & C round 1 blocks are certified.
880        let mut certifier = CertifierState::new(context.clone());
881        certifier.add_voted_blocks(all_blocks.iter().map(|b| (b.clone(), vec![])).collect());
882        let proposed_block = round_3_blocks.pop().unwrap();
883        let mut certified_blocks =
884            certifier.add_voted_blocks(vec![(proposed_block.clone(), vec![])]);
885        certified_blocks.extend(certifier.add_proposed_block(proposed_block));
886        assert_eq!(
887            certified_blocks.len(),
888            2,
889            "Certified blocks {:?}",
890            certified_blocks
891                .iter()
892                .map(|b| b.block.reference().to_string())
893                .join(",")
894        );
895        assert_eq!(
896            certified_blocks[0].block.reference(),
897            round_1_blocks[0].reference()
898        );
899        assert!(certified_blocks[0].rejected.is_empty());
900        assert_eq!(
901            certified_blocks[1].block.reference(),
902            round_1_blocks[2].reference()
903        );
904        assert_eq!(certified_blocks[1].rejected, vec![2]);
905    }
906
907    // TODO: add reject votes.
908    #[tokio::test]
909    async fn test_certify_randomized() {
910        telemetry_subscribers::init_for_testing();
911        let num_authorities: u32 = 7;
912        let (context, _) = Context::new_for_test(num_authorities as usize);
913        let context = Arc::new(context);
914
915        // Create minimal connected blocks up to num_rounds.
916        let num_rounds = 50;
917        let mut dag_builder = DagBuilder::new(context.clone());
918        dag_builder
919            .layers(1..=num_rounds)
920            .min_ancestor_links(false, None)
921            .build();
922        let all_blocks = dag_builder.all_blocks();
923
924        // Get the certified blocks, which depends on the structure of the minimum connected DAG.
925        let mut certifier = CertifierState::new(context.clone());
926        let mut expected_certified_blocks =
927            certifier.add_voted_blocks(all_blocks.iter().map(|b| (b.clone(), vec![])).collect());
928        expected_certified_blocks.sort_by_key(|b| b.block.reference());
929
930        // Adding all blocks to certifier in random order should still produce the same set of certified blocks.
931        for _ in 0..100 {
932            // Add the blocks to certifier in random order.
933            let mut all_blocks = all_blocks.clone();
934            all_blocks.shuffle(&mut rand::thread_rng());
935            let mut certifier = CertifierState::new(context.clone());
936
937            // Take the certified blocks.
938            let mut actual_certified_blocks = certifier
939                .add_voted_blocks(all_blocks.iter().map(|b| (b.clone(), vec![])).collect());
940            actual_certified_blocks.sort_by_key(|b| b.block.reference());
941
942            // Ensure the certified blocks are the expected ones.
943            assert_eq!(
944                actual_certified_blocks.len(),
945                expected_certified_blocks.len()
946            );
947            for (actual, expected) in actual_certified_blocks
948                .iter()
949                .zip(expected_certified_blocks.iter())
950            {
951                assert_eq!(actual.block.reference(), expected.block.reference());
952                assert_eq!(actual.rejected, expected.rejected);
953            }
954        }
955    }
956}