consensus_core/
transaction_certifier.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::BTreeMap, sync::Arc, time::Duration};
5
6use consensus_config::Stake;
7use consensus_types::block::{BlockRef, Round, TransactionIndex};
8use mysten_metrics::monitored_mpsc::UnboundedSender;
9use parking_lot::RwLock;
10use tracing::{debug, info};
11
12use crate::{
13    BlockAPI as _, CertifiedBlock, CertifiedBlocksOutput, VerifiedBlock,
14    block::{BlockTransactionVotes, GENESIS_ROUND},
15    block_verifier::BlockVerifier,
16    context::Context,
17    dag_state::DagState,
18    stake_aggregator::{QuorumThreshold, StakeAggregator},
19};
20
21/// TransactionCertifier has the following purposes:
22/// 1. Certifies transactions and sends them to execute on the fastpath.
23/// 2. Keeps track of own votes on transactions, and allows the votes to be retrieved
24///    later in core after acceptance of the blocks containing the transactions.
25/// 3. Aggregates reject votes on transactions, and allows the aggregated votes
26///    to be retrieved during post-commit finalization.
27///
28/// A transaction is certified if a quorum of authorities in the causal history of a proposed block
29/// vote to accept the transaction. Accept votes are implicit in blocks: if a transaction is in
30/// the causal history of a block and the block does not vote to reject it, the block
31/// is considered to vote to accept the transaction. Transaction finalization are eventually resolved
32/// post commit, by checking if there is a certification of the transaction in the causal history
33/// of the leader. So only accept votes are only considered if they are in the causal history of own
34/// proposed blocks.
35///
36/// A transaction is rejected if a quorum of authorities vote to reject it. When this happens, it is
37/// guaranteed that no validator can observe a certification of the transaction, with <= f malicious
38/// stake.
39///
40/// A block is certified if every transaction in the block is either certified or rejected.
41/// TransactionCertifier outputs certified blocks.
42///
43/// The invariant between TransactionCertifier and post-commit finalization is that if a quorum of
44/// authorities certified a transaction for fastpath and executed it, then the transaction
45/// must also be finalized post consensus commit. The reverse is not true though, because
46/// fastpath execution is only a latency optimization, and not required for correctness.
47#[derive(Clone)]
48pub struct TransactionCertifier {
49    // The state of blocks being voted on and certified.
50    certifier_state: Arc<RwLock<CertifierState>>,
51    // Verify transactions during recovery.
52    block_verifier: Arc<dyn BlockVerifier>,
53    // The state of the DAG.
54    dag_state: Arc<RwLock<DagState>>,
55    // An unbounded channel to output certified blocks to Sui consensus block handler.
56    certified_blocks_sender: UnboundedSender<CertifiedBlocksOutput>,
57}
58
59impl TransactionCertifier {
60    pub fn new(
61        context: Arc<Context>,
62        block_verifier: Arc<dyn BlockVerifier>,
63        dag_state: Arc<RwLock<DagState>>,
64        certified_blocks_sender: UnboundedSender<CertifiedBlocksOutput>,
65    ) -> Self {
66        Self {
67            certifier_state: Arc::new(RwLock::new(CertifierState::new(context))),
68            block_verifier,
69            dag_state,
70            certified_blocks_sender,
71        }
72    }
73
74    /// Recovers all blocks from DB after the given round.
75    ///
76    /// This is useful for initializing the certifier state
77    /// for future commits and block proposals.
78    pub(crate) fn recover_blocks_after_round(&self, after_round: Round) {
79        let context = self.certifier_state.read().context.clone();
80        if !context.protocol_config.mysticeti_fastpath() {
81            info!("Skipping certifier recovery in non-mysticeti fast path mode");
82            return;
83        }
84
85        let store = self.dag_state.read().store().clone();
86
87        let recovery_start_round = after_round + 1;
88        info!(
89            "Recovering certifier state from round {}",
90            recovery_start_round,
91        );
92
93        let authorities = context
94            .committee
95            .authorities()
96            .map(|(index, _)| index)
97            .collect::<Vec<_>>();
98        for authority_index in authorities {
99            let blocks = store
100                .scan_blocks_by_author(authority_index, recovery_start_round)
101                .unwrap();
102            info!(
103                "Recovered and voting on {} blocks from authority {} {}",
104                blocks.len(),
105                authority_index,
106                context.committee.authority(authority_index).hostname
107            );
108            self.recover_and_vote_on_blocks(blocks);
109        }
110    }
111
112    /// Recovers and potentially votes on the given blocks.
113    ///
114    /// Because own votes on blocks are not stored, during recovery it is necessary to vote on
115    /// input blocks that are above GC round and have not been included before, which can be
116    /// included in a future proposed block.
117    ///
118    /// In addition, add_voted_blocks() will eventually process reject votes contained in the input blocks.
119    pub(crate) fn recover_and_vote_on_blocks(&self, blocks: Vec<VerifiedBlock>) {
120        let context = self.certifier_state.read().context.clone();
121        let should_vote_blocks = {
122            let dag_state = self.dag_state.read();
123            let gc_round = dag_state.gc_round();
124            blocks
125                .iter()
126                // Must make sure the block is above GC round before calling has_been_included().
127                .map(|b| b.round() > gc_round && !dag_state.has_been_included(&b.reference()))
128                .collect::<Vec<_>>()
129        };
130        let voted_blocks = blocks
131            .into_iter()
132            .zip(should_vote_blocks)
133            .map(|(b, should_vote)| {
134                if !should_vote {
135                    // Voting is unnecessary for blocks already included in own proposed blocks,
136                    // or outside of local DAG GC bound.
137                    (b, vec![])
138                } else {
139                    // Voting is needed for blocks above GC round and not yet included in own proposed blocks.
140                    // A block proposal can include the input block later and retries own votes on it.
141                    let reject_transaction_votes =
142                        self.block_verifier.vote(&b).unwrap_or_else(|e| {
143                            panic!(
144                                "Failed to vote on block {} (own_index={}) during recovery: {}",
145                                b.reference(),
146                                context.own_index,
147                                e
148                            )
149                        });
150                    (b, reject_transaction_votes)
151                }
152            })
153            .collect::<Vec<_>>();
154        self.certifier_state.write().add_voted_blocks(voted_blocks);
155        // Do not send certified blocks to the fastpath output channel during recovery,
156        // because these transactions could have been executed and fastpath latency optimization is
157        // unnecessary for recovered transactions.
158    }
159
160    /// Stores own reject votes on input blocks, and aggregates reject votes from the input blocks.
161    /// Newly certified blocks are sent to the fastpath output channel.
162    pub fn add_voted_blocks(&self, voted_blocks: Vec<(VerifiedBlock, Vec<TransactionIndex>)>) {
163        let certified_blocks = self.certifier_state.write().add_voted_blocks(voted_blocks);
164        self.send_certified_blocks(certified_blocks);
165    }
166
167    /// Aggregates accept votes from the own proposed block.
168    /// Newly certified blocks are sent to the fastpath output channel.
169    pub(crate) fn add_proposed_block(&self, proposed_block: VerifiedBlock) {
170        let certified_blocks = self
171            .certifier_state
172            .write()
173            .add_proposed_block(proposed_block);
174        self.send_certified_blocks(certified_blocks);
175    }
176
177    // Sends certified blocks to the fastpath output channel.
178    fn send_certified_blocks(&self, certified_blocks: Vec<CertifiedBlock>) {
179        if certified_blocks.is_empty() {
180            return;
181        }
182        if let Err(e) = self.certified_blocks_sender.send(CertifiedBlocksOutput {
183            blocks: certified_blocks,
184        }) {
185            tracing::warn!("Failed to send certified blocks: {:?}", e);
186        }
187    }
188
189    /// Retrieves own votes on peer block transactions.
190    pub(crate) fn get_own_votes(&self, block_refs: Vec<BlockRef>) -> Vec<BlockTransactionVotes> {
191        let mut votes = vec![];
192        let certifier_state = self.certifier_state.read();
193        for block_ref in block_refs {
194            if block_ref.round <= certifier_state.gc_round {
195                continue;
196            }
197            let vote_info = certifier_state.votes.get(&block_ref).unwrap_or_else(|| {
198                panic!("Ancestor block {} not found in certifier state", block_ref)
199            });
200            if !vote_info.own_reject_txn_votes.is_empty() {
201                votes.push(BlockTransactionVotes {
202                    block_ref,
203                    rejects: vote_info.own_reject_txn_votes.clone(),
204                });
205            }
206        }
207        votes
208    }
209
210    /// Retrieves transactions in the block that have received reject votes, and the total stake of the votes.
211    /// TransactionIndex not included in the output has no reject votes.
212    /// Returns None if no information is found for the block.
213    pub(crate) fn get_reject_votes(
214        &self,
215        block_ref: &BlockRef,
216    ) -> Option<Vec<(TransactionIndex, Stake)>> {
217        let accumulated_reject_votes = self
218            .certifier_state
219            .read()
220            .votes
221            .get(block_ref)?
222            .reject_txn_votes
223            .iter()
224            .map(|(idx, stake_agg)| (*idx, stake_agg.stake()))
225            .collect::<Vec<_>>();
226        Some(accumulated_reject_votes)
227    }
228
229    /// Runs garbage collection on the internal state by removing data for blocks <= gc_round,
230    /// and updates the GC round for the certifier.
231    ///
232    /// IMPORTANT: the gc_round used here can trail the latest gc_round from DagState.
233    /// This is because the gc round here is determined by CommitFinalizer, which needs to process
234    /// commits before the latest commit in DagState. Reject votes received by transactions below
235    /// local DAG gc_round may still need to be accessed from CommitFinalizer.
236    pub(crate) fn run_gc(&self, gc_round: Round) {
237        let dag_state_gc_round = self.dag_state.read().gc_round();
238        assert!(
239            gc_round <= dag_state_gc_round,
240            "TransactionCertifier cannot GC higher than DagState GC round ({} > {})",
241            gc_round,
242            dag_state_gc_round
243        );
244        self.certifier_state.write().update_gc_round(gc_round);
245    }
246}
247
248/// CertifierState keeps track of votes received by each transaction and block,
249/// and helps determine if votes reach a quorum. Reject votes can start accumulating
250/// even before the target block is received by this authority.
251struct CertifierState {
252    context: Arc<Context>,
253
254    // Maps received blocks' refs to votes on those blocks from other blocks.
255    // Even if a block has no reject votes on its transactions, it still has an entry here.
256    votes: BTreeMap<BlockRef, VoteInfo>,
257
258    // Highest round where blocks are GC'ed.
259    gc_round: Round,
260}
261
262impl CertifierState {
263    fn new(context: Arc<Context>) -> Self {
264        Self {
265            context,
266            votes: BTreeMap::new(),
267            gc_round: GENESIS_ROUND,
268        }
269    }
270
271    fn add_voted_blocks(
272        &mut self,
273        voted_blocks: Vec<(VerifiedBlock, Vec<TransactionIndex>)>,
274    ) -> Vec<CertifiedBlock> {
275        let mut certified_blocks = vec![];
276        for (voted_block, reject_txn_votes) in voted_blocks {
277            let blocks = self.add_voted_block(voted_block, reject_txn_votes);
278            certified_blocks.extend(blocks);
279        }
280
281        if !certified_blocks.is_empty() {
282            self.context
283                .metrics
284                .node_metrics
285                .certifier_output_blocks
286                .with_label_values(&["voted"])
287                .inc_by(certified_blocks.len() as u64);
288        }
289
290        certified_blocks
291    }
292
293    fn add_voted_block(
294        &mut self,
295        voted_block: VerifiedBlock,
296        reject_txn_votes: Vec<TransactionIndex>,
297    ) -> Vec<CertifiedBlock> {
298        if voted_block.round() <= self.gc_round {
299            // Ignore the block and own votes, since they are outside of certifier GC bound.
300            return vec![];
301        }
302
303        // Count own reject votes against each peer authority.
304        let peer_hostname = &self
305            .context
306            .committee
307            .authority(voted_block.author())
308            .hostname;
309        self.context
310            .metrics
311            .node_metrics
312            .certifier_own_reject_votes
313            .with_label_values(&[peer_hostname])
314            .inc_by(reject_txn_votes.len() as u64);
315
316        // Initialize the entry for the voted block.
317        let vote_info = self.votes.entry(voted_block.reference()).or_default();
318        if vote_info.block.is_some() {
319            // Input block has already been processed and added to the state.
320            return vec![];
321        }
322        vote_info.block = Some(voted_block.clone());
323        vote_info.own_reject_txn_votes = reject_txn_votes;
324
325        let mut certified_blocks = vec![];
326
327        let now = self.context.clock.timestamp_utc_ms();
328
329        // Update reject votes from the input block.
330        for block_votes in voted_block.transaction_votes() {
331            if block_votes.block_ref.round <= self.gc_round {
332                // Block is outside of GC bound.
333                continue;
334            }
335            let vote_info = self.votes.entry(block_votes.block_ref).or_default();
336            for reject in &block_votes.rejects {
337                vote_info
338                    .reject_txn_votes
339                    .entry(*reject)
340                    .or_default()
341                    .add_unique(voted_block.author(), &self.context.committee);
342            }
343            // Check if the target block is now certified after including the reject votes.
344            // NOTE: votes can already exist for the target block and its transactions.
345            if let Some(certified_block) = vote_info.take_certified_output(&self.context) {
346                let authority_name = self
347                    .context
348                    .committee
349                    .authority(certified_block.block.author())
350                    .hostname
351                    .clone();
352                self.context
353                    .metrics
354                    .node_metrics
355                    .certifier_block_latency
356                    .with_label_values(&[&authority_name])
357                    .observe(
358                        Duration::from_millis(
359                            now.saturating_sub(certified_block.block.timestamp_ms()),
360                        )
361                        .as_secs_f64(),
362                    );
363                certified_blocks.push(certified_block);
364            }
365        }
366
367        certified_blocks
368    }
369
370    fn add_proposed_block(&mut self, proposed_block: VerifiedBlock) -> Vec<CertifiedBlock> {
371        if proposed_block.round() <= self.gc_round + 2 {
372            // Skip if transactions that can be certified have already been GC'ed.
373            // Skip also when the proposed block has been GC'ed from the certifier state.
374            // This is possible because this function (add_proposed_block()) is async from
375            // commit finalization, which advances the GC round of the certifier.
376            return vec![];
377        }
378        debug!(
379            "Adding proposed block {}; gc round: {}",
380            proposed_block.reference(),
381            self.gc_round
382        );
383
384        if !self.votes.contains_key(&proposed_block.reference()) {
385            self.context
386                .metrics
387                .node_metrics
388                .certifier_missing_ancestor_during_certification
389                .with_label_values(&["proposed_block_not_found"])
390                .inc();
391            debug!(
392                "Proposed block {} not found in certifier state. GC round: {}",
393                proposed_block.reference(),
394                self.gc_round,
395            );
396            return vec![];
397        }
398
399        let now = self.context.clock.timestamp_utc_ms();
400
401        // Certify transactions based on the accept votes from the proposed block's parents.
402        // Some ancestor blocks may not be found, because either they have been GC'ed due to timing
403        // issues or they were not recovered. It is ok to skip certifying blocks, which are best effort.
404        let mut certified_blocks = vec![];
405        for voting_ancestor in proposed_block.ancestors() {
406            // Votes are limited to 1 round before the proposed block.
407            if voting_ancestor.round + 1 != proposed_block.round() {
408                continue;
409            }
410            let Some(voting_info) = self.votes.get(voting_ancestor) else {
411                self.context
412                    .metrics
413                    .node_metrics
414                    .certifier_missing_ancestor_during_certification
415                    .with_label_values(&["voting_info_not_found"])
416                    .inc();
417                debug!(
418                    "Proposed block {}: voting info not found for ancestor {}",
419                    proposed_block.reference(),
420                    voting_ancestor
421                );
422                continue;
423            };
424            let Some(voting_block) = voting_info.block.clone() else {
425                self.context
426                    .metrics
427                    .node_metrics
428                    .certifier_missing_ancestor_during_certification
429                    .with_label_values(&["voting_block_not_found"])
430                    .inc();
431                debug!(
432                    "Proposed block {}: voting block not found for ancestor {}",
433                    proposed_block.reference(),
434                    voting_ancestor
435                );
436                continue;
437            };
438            for target_ancestor in voting_block.ancestors() {
439                // Target blocks are 1 round before the voting block.
440                if target_ancestor.round + 1 != voting_block.round() {
441                    continue;
442                }
443                let Some(target_vote_info) = self.votes.get_mut(target_ancestor) else {
444                    self.context
445                        .metrics
446                        .node_metrics
447                        .certifier_missing_ancestor_during_certification
448                        .with_label_values(&["target_vote_info_not_found"])
449                        .inc();
450                    debug!(
451                        "Proposed block {}: target voting info not found for ancestor {}",
452                        proposed_block.reference(),
453                        target_ancestor
454                    );
455                    continue;
456                };
457                target_vote_info
458                    .accept_block_votes
459                    .add_unique(voting_block.author(), &self.context.committee);
460                // Check if the target block is now certified after including the accept votes.
461                if let Some(certified_block) = target_vote_info.take_certified_output(&self.context)
462                {
463                    let authority_name = self
464                        .context
465                        .committee
466                        .authority(certified_block.block.author())
467                        .hostname
468                        .clone();
469                    self.context
470                        .metrics
471                        .node_metrics
472                        .certifier_block_latency
473                        .with_label_values(&[&authority_name])
474                        .observe(
475                            Duration::from_millis(
476                                now.saturating_sub(certified_block.block.timestamp_ms()),
477                            )
478                            .as_secs_f64(),
479                        );
480                    certified_blocks.push(certified_block);
481                }
482            }
483        }
484
485        if !certified_blocks.is_empty() {
486            self.context
487                .metrics
488                .node_metrics
489                .certifier_output_blocks
490                .with_label_values(&["proposed"])
491                .inc_by(certified_blocks.len() as u64);
492        }
493
494        certified_blocks
495    }
496
497    /// Updates the GC round and cleans up obsolete internal state.
498    fn update_gc_round(&mut self, gc_round: Round) {
499        self.gc_round = gc_round;
500        while let Some((block_ref, _)) = self.votes.first_key_value() {
501            if block_ref.round <= self.gc_round {
502                self.votes.pop_first();
503            } else {
504                break;
505            }
506        }
507
508        self.context
509            .metrics
510            .node_metrics
511            .certifier_gc_round
512            .set(self.gc_round as i64);
513    }
514}
515
516/// VoteInfo keeps track of votes received for each transaction of this block,
517/// possibly even before the block is received by this authority.
518struct VoteInfo {
519    // Content of the block.
520    // None if the blocks has not been received.
521    block: Option<VerifiedBlock>,
522    // Rejection votes by this authority on this block.
523    // This field is written when the block is first received and its transactions are voted on.
524    // It is read from core after the block is accepted.
525    own_reject_txn_votes: Vec<TransactionIndex>,
526    // Accumulates implicit accept votes for the block and all transactions.
527    accept_block_votes: StakeAggregator<QuorumThreshold>,
528    // Accumulates reject votes per transaction in this block.
529    reject_txn_votes: BTreeMap<TransactionIndex, StakeAggregator<QuorumThreshold>>,
530    // Whether this block has been certified already.
531    is_certified: bool,
532}
533
534impl VoteInfo {
535    // If this block can now be certified, returns the output.
536    // Otherwise, returns None.
537    fn take_certified_output(&mut self, context: &Context) -> Option<CertifiedBlock> {
538        let committee = &context.committee;
539        if self.is_certified {
540            // Skip if already certified.
541            return None;
542        }
543        let Some(block) = self.block.as_ref() else {
544            // Skip if the content of the block has not been received.
545            return None;
546        };
547
548        let peer_hostname = &committee.authority(block.author()).hostname;
549
550        if !self.accept_block_votes.reached_threshold(committee) {
551            // Skip if the block is not certified.
552            return None;
553        }
554        let mut rejected = vec![];
555        for (idx, reject_txn_votes) in &self.reject_txn_votes {
556            // The transaction is voted to be rejected.
557            if reject_txn_votes.reached_threshold(committee) {
558                context
559                    .metrics
560                    .node_metrics
561                    .certifier_rejected_transactions
562                    .with_label_values(&[peer_hostname])
563                    .inc();
564                rejected.push(*idx);
565                continue;
566            }
567            // If a transaction does not have a quorum of accept votes minus the reject votes,
568            // it is neither rejected nor certified. In this case the whole block cannot
569            // be considered as certified.
570
571            // accept_block_votes can be < reject_txn_votes on the transaction when reject_txn_votes
572            // come from blocks more than 1 round higher, which do not add to the
573            // accept votes of the block.
574            //
575            // Also, the total accept votes of a transactions is undercounted here.
576            // If a block has accept votes from a quorum of authorities A, B and C, but one transaction
577            // has a reject vote from D, the transaction and block are technically certified
578            // and can be sent to fastpath. However, the computation here will not certify the transaction
579            // or the block. This is still fine because the fastpath certification is optional.
580            // The definite status of the transaction will be decided during post commit finalization.
581            if self
582                .accept_block_votes
583                .stake()
584                .saturating_sub(reject_txn_votes.stake())
585                < committee.quorum_threshold()
586            {
587                return None;
588            }
589        }
590        // The block is certified.
591        let accepted_txn_count = block.transactions().len().saturating_sub(rejected.len());
592        tracing::trace!(
593            "Certified block {} accepted tx count: {accepted_txn_count} & rejected txn count: {}",
594            block.reference(),
595            rejected.len()
596        );
597        context
598            .metrics
599            .node_metrics
600            .certifier_accepted_transactions
601            .with_label_values(&[peer_hostname])
602            .inc_by(accepted_txn_count as u64);
603        self.is_certified = true;
604        Some(CertifiedBlock {
605            block: block.clone(),
606            rejected,
607        })
608    }
609}
610
611impl Default for VoteInfo {
612    fn default() -> Self {
613        Self {
614            block: None,
615            own_reject_txn_votes: vec![],
616            accept_block_votes: StakeAggregator::new(),
617            reject_txn_votes: BTreeMap::new(),
618            is_certified: false,
619        }
620    }
621}
622
623#[cfg(test)]
624mod test {
625    use consensus_config::AuthorityIndex;
626    use itertools::Itertools;
627    use rand::seq::SliceRandom as _;
628
629    use crate::{
630        TestBlock, Transaction, block::BlockTransactionVotes, context::Context,
631        test_dag_builder::DagBuilder,
632    };
633
634    use super::*;
635
636    #[tokio::test]
637    async fn test_vote_info_basic() {
638        telemetry_subscribers::init_for_testing();
639        let (context, _) = Context::new_for_test(7);
640        let committee = &context.committee;
641
642        // No accept votes.
643        {
644            let mut vote_info = VoteInfo::default();
645            let block = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
646            vote_info.block = Some(block.clone());
647
648            assert!(vote_info.take_certified_output(&context).is_none());
649        }
650
651        // Accept votes but not enough.
652        {
653            let mut vote_info = VoteInfo::default();
654            let block = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
655            vote_info.block = Some(block.clone());
656            for i in 0..4 {
657                vote_info
658                    .accept_block_votes
659                    .add_unique(AuthorityIndex::new_for_test(i), committee);
660            }
661
662            assert!(vote_info.take_certified_output(&context).is_none());
663        }
664
665        // Enough accept votes but no block.
666        {
667            let mut vote_info = VoteInfo::default();
668            for i in 0..5 {
669                vote_info
670                    .accept_block_votes
671                    .add_unique(AuthorityIndex::new_for_test(i), committee);
672            }
673
674            assert!(vote_info.take_certified_output(&context).is_none());
675        }
676
677        // A quorum of accept votes and block exists.
678        {
679            let mut vote_info = VoteInfo::default();
680            let block = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
681            vote_info.block = Some(block.clone());
682            for i in 0..4 {
683                vote_info
684                    .accept_block_votes
685                    .add_unique(AuthorityIndex::new_for_test(i), committee);
686            }
687
688            // The block is not certified.
689            assert!(vote_info.take_certified_output(&context).is_none());
690
691            // Add 1 more accept vote from a different authority.
692            vote_info
693                .accept_block_votes
694                .add_unique(AuthorityIndex::new_for_test(4), committee);
695
696            // The block is now certified.
697            let certified_block = vote_info.take_certified_output(&context).unwrap();
698            assert_eq!(certified_block.block.reference(), block.reference());
699
700            // Certified block cannot be taken again.
701            assert!(vote_info.take_certified_output(&context).is_none());
702        }
703
704        // A quorum of accept and reject votes.
705        {
706            let mut vote_info = VoteInfo::default();
707            // Create a block with 7 transactions.
708            let block = VerifiedBlock::new_for_test(
709                TestBlock::new(1, 1)
710                    .set_transactions(vec![Transaction::new(vec![4; 8]); 7])
711                    .build(),
712            );
713            vote_info.block = Some(block.clone());
714            // Add 5 accept votes which form a quorum.
715            for i in 0..5 {
716                vote_info
717                    .accept_block_votes
718                    .add_unique(AuthorityIndex::new_for_test(i), committee);
719            }
720            // For transactions 3 - 7 ..
721            for reject_tx_idx in 3..8 {
722                vote_info
723                    .reject_txn_votes
724                    .insert(reject_tx_idx, StakeAggregator::new());
725                // .. add 5 reject votes which form a quorum.
726                for authority_idx in 0..5 {
727                    vote_info
728                        .reject_txn_votes
729                        .get_mut(&reject_tx_idx)
730                        .unwrap()
731                        .add_unique(AuthorityIndex::new_for_test(authority_idx), committee);
732                }
733            }
734
735            // The block is certified.
736            let certified_block = vote_info.take_certified_output(&context).unwrap();
737            assert_eq!(certified_block.block.reference(), block.reference());
738
739            // Certified block cannot be taken again.
740            assert!(vote_info.take_certified_output(&context).is_none());
741        }
742
743        // A transaction in the block is neither rejected nor certified.
744        {
745            let mut vote_info = VoteInfo::default();
746            // Create a block with 6 transactions.
747            let block = VerifiedBlock::new_for_test(
748                TestBlock::new(1, 1)
749                    .set_transactions(vec![Transaction::new(vec![4; 8]); 6])
750                    .build(),
751            );
752            vote_info.block = Some(block.clone());
753            // Add 5 accept votes which form a quorum.
754            for i in 0..5 {
755                vote_info
756                    .accept_block_votes
757                    .add_unique(AuthorityIndex::new_for_test(i), committee);
758            }
759            // For transactions 3 - 5 ..
760            for reject_tx_idx in 3..6 {
761                vote_info
762                    .reject_txn_votes
763                    .insert(reject_tx_idx, StakeAggregator::new());
764                // .. add 5 reject votes which form a quorum.
765                for authority_idx in 0..5 {
766                    vote_info
767                        .reject_txn_votes
768                        .get_mut(&reject_tx_idx)
769                        .unwrap()
770                        .add_unique(AuthorityIndex::new_for_test(authority_idx), committee);
771                }
772            }
773            // For transaction 6, add 4 reject votes which do not form a quorum.
774            vote_info.reject_txn_votes.insert(5, StakeAggregator::new());
775            for authority_idx in 0..4 {
776                vote_info
777                    .reject_txn_votes
778                    .get_mut(&5)
779                    .unwrap()
780                    .add_unique(AuthorityIndex::new_for_test(authority_idx), committee);
781            }
782
783            // The block is not certified.
784            assert!(vote_info.take_certified_output(&context).is_none());
785
786            // Add 1 more accept vote from a different authority for transaction 6.
787            vote_info
788                .reject_txn_votes
789                .get_mut(&5)
790                .unwrap()
791                .add_unique(AuthorityIndex::new_for_test(4), committee);
792
793            // The block is now certified.
794            let certified_block = vote_info.take_certified_output(&context).unwrap();
795            assert_eq!(certified_block.block.reference(), block.reference());
796
797            // Certified block cannot be taken again.
798            assert!(vote_info.take_certified_output(&context).is_none());
799        }
800    }
801
802    #[tokio::test]
803    async fn test_certify_basic() {
804        telemetry_subscribers::init_for_testing();
805        let (context, _) = Context::new_for_test(4);
806        let context = Arc::new(context);
807
808        // GIVEN: Round 1: blocks from all authorities are fully connected to the genesis blocks.
809        let mut dag_builder = DagBuilder::new(context.clone());
810        dag_builder.layer(1).num_transactions(4).build();
811        let round_1_blocks = dag_builder.all_blocks();
812        let mut all_blocks = round_1_blocks.clone();
813
814        // THEN: Round 1: no block can be certified yet.
815        let mut certifier = CertifierState::new(context.clone());
816        let certified_blocks = certifier
817            .add_voted_blocks(round_1_blocks.iter().map(|b| (b.clone(), vec![])).collect());
818        assert!(certified_blocks.is_empty());
819
820        // GIVEN: Round 2: A, B & C blocks at round 2 are connected to only A, B & C blocks at round 1.
821        // AND: A & B blocks reject transaction 1 from the round 1 B block.
822        // AND: A, B & C blocks reject transaction 2 from the round 1 C block.
823        let transactions = (0..4)
824            .map(|_| Transaction::new(vec![0_u8; 16]))
825            .collect::<Vec<_>>();
826        let ancestors = round_1_blocks
827            .iter()
828            .filter_map(|b| {
829                if b.author().value() < 3 {
830                    Some(b.reference())
831                } else {
832                    None
833                }
834            })
835            .collect::<Vec<_>>();
836        for author in 0..3 {
837            let mut block = TestBlock::new(2, author)
838                .set_ancestors(ancestors.clone())
839                .set_transactions(transactions.clone());
840            let mut votes = vec![];
841            for i in 0..(3 - author) {
842                let j = author + i;
843                if j == 0 {
844                    // Do not reject transaction 0 from the round 1 A block.
845                    continue;
846                }
847                votes.push(BlockTransactionVotes {
848                    block_ref: round_1_blocks[j as usize].reference(),
849                    rejects: vec![j as u16],
850                });
851            }
852            block = block.set_transaction_votes(votes);
853            all_blocks.push(VerifiedBlock::new_for_test(block.build()));
854        }
855
856        // THEN: Round 2: no block can be certified yet.
857        let mut certifier = CertifierState::new(context.clone());
858        let certified_blocks =
859            certifier.add_voted_blocks(all_blocks.iter().map(|b| (b.clone(), vec![])).collect());
860        assert!(certified_blocks.is_empty());
861
862        // GIVEN: Round 3: all blocks connected to round 2 blocks and round 1 D block,
863        let ancestors = all_blocks
864            .iter()
865            .filter_map(|b| {
866                if b.round() == 1 && b.author().value() == 3 {
867                    Some(b.reference())
868                } else if b.round() == 2 {
869                    assert_ne!(b.author().value(), 3);
870                    Some(b.reference())
871                } else {
872                    None
873                }
874            })
875            .collect::<Vec<_>>();
876        assert_eq!(ancestors.len(), 4, "Ancestors {:?}", ancestors);
877        let mut round_3_blocks = vec![];
878        for author in 0..4 {
879            let block = TestBlock::new(3, author)
880                .set_ancestors(ancestors.clone())
881                .set_transactions(transactions.clone());
882            round_3_blocks.push(VerifiedBlock::new_for_test(block.build()));
883        }
884
885        // THEN: Round 3: with 1 round 3 block, A & C round 1 blocks are certified.
886        let mut certifier = CertifierState::new(context.clone());
887        certifier.add_voted_blocks(all_blocks.iter().map(|b| (b.clone(), vec![])).collect());
888        let proposed_block = round_3_blocks.pop().unwrap();
889        let mut certified_blocks =
890            certifier.add_voted_blocks(vec![(proposed_block.clone(), vec![])]);
891        certified_blocks.extend(certifier.add_proposed_block(proposed_block));
892        assert_eq!(
893            certified_blocks.len(),
894            2,
895            "Certified blocks {:?}",
896            certified_blocks
897                .iter()
898                .map(|b| b.block.reference().to_string())
899                .join(",")
900        );
901        assert_eq!(
902            certified_blocks[0].block.reference(),
903            round_1_blocks[0].reference()
904        );
905        assert!(certified_blocks[0].rejected.is_empty());
906        assert_eq!(
907            certified_blocks[1].block.reference(),
908            round_1_blocks[2].reference()
909        );
910        assert_eq!(certified_blocks[1].rejected, vec![2]);
911    }
912
913    // TODO: add reject votes.
914    #[tokio::test]
915    async fn test_certify_randomized() {
916        telemetry_subscribers::init_for_testing();
917        let num_authorities: u32 = 7;
918        let (context, _) = Context::new_for_test(num_authorities as usize);
919        let context = Arc::new(context);
920
921        // Create minimal connected blocks up to num_rounds.
922        let num_rounds = 50;
923        let mut dag_builder = DagBuilder::new(context.clone());
924        dag_builder
925            .layers(1..=num_rounds)
926            .min_ancestor_links(false, None)
927            .build();
928        let all_blocks = dag_builder.all_blocks();
929
930        // Get the certified blocks, which depends on the structure of the minimum connected DAG.
931        let mut certifier = CertifierState::new(context.clone());
932        let mut expected_certified_blocks =
933            certifier.add_voted_blocks(all_blocks.iter().map(|b| (b.clone(), vec![])).collect());
934        expected_certified_blocks.sort_by_key(|b| b.block.reference());
935
936        // Adding all blocks to certifier in random order should still produce the same set of certified blocks.
937        for _ in 0..100 {
938            // Add the blocks to certifier in random order.
939            let mut all_blocks = all_blocks.clone();
940            all_blocks.shuffle(&mut rand::thread_rng());
941            let mut certifier = CertifierState::new(context.clone());
942
943            // Take the certified blocks.
944            let mut actual_certified_blocks = certifier
945                .add_voted_blocks(all_blocks.iter().map(|b| (b.clone(), vec![])).collect());
946            actual_certified_blocks.sort_by_key(|b| b.block.reference());
947
948            // Ensure the certified blocks are the expected ones.
949            assert_eq!(
950                actual_certified_blocks.len(),
951                expected_certified_blocks.len()
952            );
953            for (actual, expected) in actual_certified_blocks
954                .iter()
955                .zip(expected_certified_blocks.iter())
956            {
957                assert_eq!(actual.block.reference(), expected.block.reference());
958                assert_eq!(actual.rejected, expected.rejected);
959            }
960        }
961    }
962}