consensus_core/
commit_test_fixture.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Shared test fixture for commit-related tests.
5//! Used by both commit_finalizer.rs tests and randomized_tests.rs.
6
7use std::collections::{BTreeMap, BTreeSet, VecDeque};
8use std::ops::Bound::Included;
9use std::sync::Arc;
10
11use consensus_config::{AuthorityIndex, Stake};
12use consensus_types::block::{BlockDigest, BlockRef};
13use consensus_types::block::{Round, TransactionIndex};
14use mysten_metrics::monitored_mpsc::unbounded_channel;
15use parking_lot::RwLock;
16use rand::prelude::SliceRandom;
17use rand::{Rng, rngs::StdRng};
18
19use crate::Transaction;
20use crate::block::{BlockTransactionVotes, TestBlock, genesis_blocks};
21use crate::{
22    block::{BlockAPI, VerifiedBlock},
23    block_manager::BlockManager,
24    block_verifier::NoopBlockVerifier,
25    commit::{CommittedSubDag, DecidedLeader},
26    commit_finalizer::CommitFinalizer,
27    context::Context,
28    dag_state::DagState,
29    leader_schedule::{LeaderSchedule, LeaderSwapTable},
30    linearizer::Linearizer,
31    storage::mem_store::MemStore,
32    transaction_certifier::TransactionCertifier,
33    universal_committer::{
34        UniversalCommitter, universal_committer_builder::UniversalCommitterBuilder,
35    },
36};
37
38/// A test fixture that provides all the components needed for testing commit processing,
39/// similar to the actual logic in Core::try_commit() and CommitFinalizer::run().
40pub struct CommitTestFixture {
41    pub context: Arc<Context>,
42    pub linearizer: Linearizer,
43    pub transaction_certifier: TransactionCertifier,
44    pub commit_finalizer: CommitFinalizer,
45
46    dag_state: Arc<RwLock<DagState>>,
47    block_manager: BlockManager,
48    committer: UniversalCommitter,
49}
50
51impl CommitTestFixture {
52    /// Creates a new CommitTestFixture from a context.
53    pub fn new(context: Arc<Context>) -> Self {
54        let leader_schedule = Arc::new(LeaderSchedule::new(
55            context.clone(),
56            LeaderSwapTable::default(),
57        ));
58        let dag_state = Arc::new(RwLock::new(DagState::new(
59            context.clone(),
60            Arc::new(MemStore::new()),
61        )));
62
63        // Create committer with pipelining and only 1 leader per leader round
64        let committer =
65            UniversalCommitterBuilder::new(context.clone(), leader_schedule, dag_state.clone())
66                .with_pipeline(true)
67                .build();
68
69        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
70
71        let linearizer = Linearizer::new(context.clone(), dag_state.clone());
72        let (blocks_sender, _blocks_receiver) = unbounded_channel("consensus_block_output");
73        let transaction_certifier = TransactionCertifier::new(
74            context.clone(),
75            Arc::new(NoopBlockVerifier {}),
76            dag_state.clone(),
77            blocks_sender,
78        );
79        let (commit_sender, _commit_receiver) = unbounded_channel("consensus_commit_output");
80        let commit_finalizer = CommitFinalizer::new(
81            context.clone(),
82            dag_state.clone(),
83            transaction_certifier.clone(),
84            commit_sender,
85        );
86
87        Self {
88            context,
89            linearizer,
90            transaction_certifier,
91            commit_finalizer,
92            dag_state,
93            block_manager,
94            committer,
95        }
96    }
97
98    /// Creates a new CommitTestFixture with more options.
99    pub fn with_options(
100        num_authorities: usize,
101        authority_index: u32,
102        gc_depth: Option<u32>,
103    ) -> Self {
104        Self::new(Self::context_with_options(
105            num_authorities,
106            authority_index,
107            gc_depth,
108        ))
109    }
110
111    pub fn context_with_options(
112        num_authorities: usize,
113        authority_index: u32,
114        gc_depth: Option<u32>,
115    ) -> Arc<Context> {
116        let (mut context, _keys) = Context::new_for_test(num_authorities);
117        if let Some(gc_depth) = gc_depth {
118            context
119                .protocol_config
120                .set_consensus_gc_depth_for_testing(gc_depth);
121        }
122        // Skipping equivocation validation is necessary when testing with equivocators.
123        // Also it is ok when only testing the commit code path.
124        context.parameters.internal.skip_equivocation_validation = true;
125        Arc::new(context.with_authority_index(AuthorityIndex::new_for_test(authority_index)))
126    }
127
128    // Adds the blocks to the transaction certifier and then tries to accept them via BlockManager.
129    /// This registers the blocks for reject vote tracking (with no reject votes).
130    pub fn try_accept_blocks(&mut self, blocks: Vec<VerifiedBlock>) {
131        self.transaction_certifier
132            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
133        self.block_manager.try_accept_blocks(blocks);
134    }
135
136    /// Adds blocks to the transaction certifier and dag state.
137    /// This registers the blocks for reject vote tracking (with no reject votes).
138    pub fn add_blocks(&self, blocks: Vec<VerifiedBlock>) {
139        let blocks_and_votes = blocks.iter().map(|b| (b.clone(), vec![])).collect();
140        self.transaction_certifier
141            .add_voted_blocks(blocks_and_votes);
142        self.dag_state.write().accept_blocks(blocks);
143    }
144
145    pub fn add_blocks_with_own_votes(
146        &self,
147        blocks_and_votes: Vec<(VerifiedBlock, Vec<TransactionIndex>)>,
148    ) {
149        let blocks = blocks_and_votes.iter().map(|(b, _)| b.clone()).collect();
150        self.transaction_certifier
151            .add_voted_blocks(blocks_and_votes);
152        self.dag_state.write().accept_blocks(blocks);
153    }
154
155    /// Checks if the block manager has no suspended blocks.
156    #[cfg(test)]
157    pub(crate) fn has_no_suspended_blocks(&self) -> bool {
158        self.block_manager.is_empty()
159    }
160
161    /// Tries to decide leaders, process and finalize commits, returning finalized commits
162    /// and the updated last_decided slot.
163    pub async fn try_commit(
164        &mut self,
165        last_decided: crate::block::Slot,
166    ) -> (Vec<CommittedSubDag>, crate::block::Slot) {
167        let sequence = self.committer.try_decide(last_decided);
168        let new_last_decided = sequence
169            .last()
170            .map(|leader| leader.slot())
171            .unwrap_or(last_decided);
172        let finalized = self.process_commits(sequence).await;
173        (finalized, new_last_decided)
174    }
175
176    /// Process decided leaders through linearizer and commit finalizer,
177    /// similar to Core::try_commit() and CommitFinalizer::run().
178    ///
179    /// This extracts leader blocks from DecidedLeader::Commit, creates CommittedSubDags
180    /// via the linearizer, and processes them through the commit finalizer.
181    pub(crate) async fn process_commits(
182        &mut self,
183        sequence: Vec<DecidedLeader>,
184    ) -> Vec<CommittedSubDag> {
185        // Extract leader blocks from DecidedLeader::Commit (skip Skip decisions)
186        let leaders: Vec<VerifiedBlock> = sequence
187            .into_iter()
188            .filter_map(|d| match d {
189                DecidedLeader::Commit(block, _) => Some(block),
190                DecidedLeader::Skip(_) => None,
191            })
192            .collect();
193
194        if leaders.is_empty() {
195            return vec![];
196        }
197
198        // Use linearizer to create CommittedSubDag
199        let committed_sub_dags = self.linearizer.handle_commit(leaders);
200
201        // After handle_commit(), the GC round is updated. We need to unsuspend any blocks that were
202        // suspended because of missing ancestors that are now GC'ed.
203        self.block_manager
204            .try_unsuspend_blocks_for_latest_gc_round();
205
206        // Process through commit finalizer
207        let mut finalized_commits = vec![];
208        for mut subdag in committed_sub_dags {
209            subdag.decided_with_local_blocks = true;
210            let finalized = self.commit_finalizer.process_commit(subdag).await;
211            finalized_commits.extend(finalized);
212        }
213
214        finalized_commits
215    }
216}
217
218/// Compare commit sequences across all runs, asserting they are identical.
219/// Returns the shortest commit sequence for additional assertions if needed.
220pub fn assert_commit_sequences_match(
221    commit_sequences: Vec<Vec<CommittedSubDag>>,
222) -> Vec<CommittedSubDag> {
223    let (shortest_idx, shortest_sequence) = commit_sequences
224        .iter()
225        .enumerate()
226        .min_by_key(|(_, seq)| seq.len())
227        .expect("commit_sequences should not be empty");
228
229    for (run, commit_sequence) in commit_sequences.iter().enumerate() {
230        // Since INDIRECT_REJECT_DEPTH is 3, the maximum number of commits buffered in CommitFinalizer is 3.
231        // And because of the direct finalization optimization, it might happen the last 3 commits are pending in
232        // one run but all finalized in another.
233        assert!(
234            commit_sequence.len() <= shortest_sequence.len() + 3,
235            "Commit sequence at run {run} is more than 3 commits longer than shortest (run {shortest_idx}): {} vs {}",
236            commit_sequence.len(),
237            shortest_sequence.len()
238        );
239
240        for (commit_index, (c1, c2)) in commit_sequence
241            .iter()
242            .zip(shortest_sequence.iter())
243            .enumerate()
244        {
245            assert_eq!(
246                c1.leader, c2.leader,
247                "Leader mismatch at run {run} commit {commit_index}"
248            );
249            assert_eq!(
250                c1.commit_ref, c2.commit_ref,
251                "Commit sequence mismatch at run {run} commit {commit_index}"
252            );
253            assert_eq!(
254                c1.rejected_transactions_by_block, c2.rejected_transactions_by_block,
255                "Rejected transactions mismatch at run {run} commit {commit_index}"
256            );
257        }
258    }
259
260    let mut total_transactions = 0;
261    let mut rejected_transactions = 0;
262    let mut reject_votes = 0;
263    let mut blocks = 4;
264    for commit in shortest_sequence.iter() {
265        total_transactions += commit
266            .blocks
267            .iter()
268            .map(|block| block.transactions().len())
269            .sum::<usize>();
270        rejected_transactions += commit
271            .rejected_transactions_by_block
272            .values()
273            .map(|transactions| transactions.len())
274            .sum::<usize>();
275        reject_votes += commit
276            .blocks
277            .iter()
278            .map(|block| block.transaction_votes().len())
279            .sum::<usize>();
280        blocks += commit.blocks.len();
281    }
282
283    tracing::info!(
284        "Finished comparing commit sequences. Commits: {}, Blocks: {}, Total transactions: {}, Rejected transactions: {}, Reject votes: {}",
285        shortest_sequence.len(),
286        blocks,
287        total_transactions,
288        rejected_transactions,
289        reject_votes
290    );
291
292    shortest_sequence.clone()
293}
294
295// ---- RandomDag, RandomDagConfig and RandomDagIterator ----
296
297/// Configuration for generating a randomized DAG.
298pub struct RandomDagConfig {
299    // Number of distinct authorities creating blocks.
300    pub num_authorities: usize,
301    // Number of rounds to generate.
302    pub num_rounds: Round,
303    // Number of transactions per block.
304    pub num_transactions: u32,
305    // Percentage chance of transactions to randomly reject.
306    pub reject_percentage: u8,
307    // Each element specifies the authority index, and the number of equivocators
308    // that are acting under this authority.
309    pub equivocators: Vec<(AuthorityIndex, u16)>,
310}
311
312/// A randomly generated DAG for testing commit patterns with reject votes.
313pub struct RandomDag {
314    context: Arc<Context>,
315    pub blocks: Vec<VerifiedBlock>,
316    num_rounds: Round,
317}
318
319impl RandomDag {
320    /// Creates a new RandomDag with generated blocks containing transactions and reject votes.
321    pub fn new(context: Arc<Context>, rng: &mut StdRng, config: RandomDagConfig) -> Self {
322        let RandomDagConfig {
323            num_authorities: _,
324            num_rounds,
325            num_transactions,
326            reject_percentage,
327            equivocators,
328        } = config;
329
330        let committee = &context.committee;
331        let quorum_threshold = committee.quorum_threshold();
332        let total_stake = committee.total_stake();
333
334        // Create instance ID for each authority and equivocators.
335        let mut instances: Vec<InstanceID> = committee.authorities().map(|(a, _)| (a, 0)).collect();
336        for (authority, num_equivocators) in equivocators {
337            for i in 1..=num_equivocators {
338                instances.push((authority, i));
339            }
340        }
341
342        let genesis_blocks = genesis_blocks(&context)
343            .into_iter()
344            .map(|b| (b.author(), b))
345            .collect::<BTreeMap<AuthorityIndex, VerifiedBlock>>();
346        let last_round_blocks: Vec<VerifiedBlock> = genesis_blocks.values().cloned().collect();
347
348        // Store all blocks for lookup and range search.
349        let mut all_blocks: BTreeMap<BlockRef, VerifiedBlock> = BTreeMap::new();
350        // Initialize with genesis blocks.
351        for block in &last_round_blocks {
352            all_blocks.insert(block.reference(), block.clone());
353        }
354
355        // Track the latest block per instance. Equivocators start from the same genesis blocks per authority.
356        let mut latest_blocks = instances
357            .iter()
358            .map(|&(a, i)| {
359                let b = genesis_blocks.get(&a).unwrap();
360                ((a, i), b.clone())
361            })
362            .collect::<BTreeMap<InstanceID, VerifiedBlock>>();
363
364        // Track included blocks per instance (simulates link_causal_history).
365        let mut included_refs = BTreeMap::<InstanceID, BTreeSet<BlockRef>>::new();
366
367        for r in 1..=num_rounds {
368            // Select random quorum-or-more stake to produce blocks this round.
369            let target_stake = rng.gen_range(quorum_threshold..=total_stake);
370
371            // Select random instances to produce blocks this round.
372            // Equivocations are allowed when multiple instances from the same authority propose at
373            // the same slot.
374            let mut proposers = instances.clone();
375            proposers.shuffle(rng);
376            let mut selected_stake = 0;
377            // Ensure stake across equivocations are counted only once.
378            let mut selected_authorities = vec![false; committee.size()];
379            let selected_proposers: Vec<_> = proposers
380                .into_iter()
381                .take_while(|instance| {
382                    if selected_stake >= target_stake {
383                        return false;
384                    }
385                    if !selected_authorities[instance.0.value()] {
386                        selected_authorities[instance.0.value()] = true;
387                        selected_stake += committee.stake(instance.0);
388                    }
389                    true
390                })
391                .collect();
392
393            let mut current_round_blocks = Vec::new();
394            for instance_id in selected_proposers {
395                let block = build_block_for_instance(
396                    &context,
397                    &instances,
398                    rng,
399                    r,
400                    instance_id,
401                    num_transactions,
402                    reject_percentage,
403                    &all_blocks,
404                    &mut latest_blocks,
405                    &mut included_refs,
406                );
407                current_round_blocks.push((instance_id, block));
408            }
409
410            // Update state with current round blocks.
411            for (instance_id, block) in current_round_blocks {
412                all_blocks.insert(block.reference(), block.clone());
413                latest_blocks.insert(instance_id, block);
414            }
415        }
416
417        RandomDag {
418            context,
419            blocks: all_blocks.values().cloned().collect(),
420            num_rounds,
421        }
422    }
423
424    /// Creates a RandomDag from existing blocks.
425    pub fn from_blocks(context: Arc<Context>, blocks: Vec<VerifiedBlock>) -> Self {
426        let num_rounds = blocks.iter().map(|b| b.round()).max().unwrap_or(0);
427        RandomDag {
428            context,
429            blocks,
430            num_rounds,
431        }
432    }
433
434    /// Creates an iterator yielding blocks in constrained random order.
435    pub fn random_iter<'a>(
436        &'a self,
437        rng: &'a mut StdRng,
438        max_step: Round,
439    ) -> RandomDagIterator<'a> {
440        RandomDagIterator::new(self, rng, max_step)
441    }
442}
443
444// Identifies a consensus instance, by its authority index and another index to differentiate between equivocators.
445type InstanceID = (AuthorityIndex, u16);
446
447/// Builds a single block for the given consensus instance at the specified round.
448fn build_block_for_instance(
449    context: &Arc<Context>,
450    instances: &[InstanceID],
451    rng: &mut StdRng,
452    round: Round,
453    own_instance: InstanceID,
454    num_transactions: u32,
455    reject_percentage: u8,
456    all_blocks: &BTreeMap<BlockRef, VerifiedBlock>,
457    latest_blocks: &mut BTreeMap<InstanceID, VerifiedBlock>,
458    included_refs: &mut BTreeMap<InstanceID, BTreeSet<BlockRef>>,
459) -> VerifiedBlock {
460    let committee = &context.committee;
461    let quorum_threshold = committee.quorum_threshold();
462    let own_authority = own_instance.0;
463
464    // Select blocks from the previous round until quorum stake is reached.
465    let prev_round = round - 1;
466    let mut prev_round_blocks: Vec<_> = all_blocks
467        .range((
468            Included(BlockRef::new(
469                prev_round,
470                AuthorityIndex::MIN,
471                BlockDigest::MIN,
472            )),
473            Included(BlockRef::new(
474                prev_round,
475                AuthorityIndex::MAX,
476                BlockDigest::MAX,
477            )),
478        ))
479        .map(|(_, b)| b)
480        .cloned()
481        .collect();
482    prev_round_blocks.shuffle(rng);
483    let mut parent_stake: Stake = 0;
484    // Ensure only one block gets selected per authority, regardless of equivocations.
485    let mut selected_authorities = vec![false; committee.size()];
486    let quorum_parents: Vec<_> = prev_round_blocks
487        .into_iter()
488        .filter_map(|b| {
489            if parent_stake >= quorum_threshold {
490                return None;
491            }
492            if selected_authorities[b.author().value()] {
493                return None;
494            }
495            selected_authorities[b.author().value()] = true;
496            parent_stake += committee.stake(b.author());
497            Some(b)
498        })
499        .collect();
500
501    // Find so far unselected instances.
502    let mut unselected_instances: Vec<_> = instances
503        .iter()
504        .filter(|(authority, _)| !selected_authorities[authority.value()])
505        .cloned()
506        .collect();
507
508    // Randomly select extra blocks to link to among ancestors.
509    unselected_instances.shuffle(rng);
510    // Use min of two uniform samples to bias toward fewer additional ancestors
511    // while maintaining non-zero probability for all counts.
512    let extra_count = rng
513        .gen_range(0..=unselected_instances.len())
514        .min(rng.gen_range(0..=unselected_instances.len()));
515    let additional_ancestor_blocks: Vec<_> = unselected_instances[0..extra_count]
516        .iter()
517        .filter_map(|&(authority, instance)| {
518            if selected_authorities[authority.value()] {
519                return None;
520            }
521            let block = latest_blocks.get(&(authority, instance))?;
522            assert!(
523                block.round() < round,
524                "latest_blocks should only contain blocks from previous rounds"
525            );
526            selected_authorities[authority.value()] = true;
527            Some(block.clone())
528        })
529        .collect();
530
531    // Combine ancestors: quorum parents + extra ancestors from unselected authorities + own latest block if necessary.
532    let mut ancestor_blocks = quorum_parents;
533    ancestor_blocks.extend(additional_ancestor_blocks);
534    if !ancestor_blocks.iter().any(|b| b.author() == own_authority) {
535        ancestor_blocks.push(latest_blocks[&own_instance].clone());
536    }
537    let ancestors: Vec<_> = ancestor_blocks.iter().map(|b| b.reference()).collect();
538
539    // Find newly connected blocks via BFS (similar to link_causal_history).
540    let mut newly_connected = Vec::new();
541    let mut queue = VecDeque::from_iter(ancestors.iter().copied());
542    while let Some(block_ref) = queue.pop_front() {
543        if block_ref.round == 0 {
544            continue; // Skip genesis blocks.
545        }
546        if included_refs
547            .entry(own_instance)
548            .or_default()
549            .contains(&block_ref)
550        {
551            continue; // Already included.
552        }
553        included_refs
554            .entry(own_instance)
555            .or_default()
556            .insert(block_ref);
557        newly_connected.push(block_ref);
558        // Traverse ancestors.
559        if let Some(block) = all_blocks.get(&block_ref) {
560            queue.extend(block.ancestors().iter().cloned());
561        }
562    }
563
564    // Generate random reject votes for newly connected blocks only.
565    let votes: Vec<_> = newly_connected
566        .iter()
567        .filter(|_| reject_percentage > 0)
568        .filter_map(|&block_ref| {
569            let rejects: Vec<_> = (0..num_transactions)
570                .filter(|_| rng.gen_range(0..100) < reject_percentage)
571                .map(|idx| idx as TransactionIndex)
572                .collect();
573            (!rejects.is_empty()).then_some(BlockTransactionVotes { block_ref, rejects })
574        })
575        .collect();
576
577    let transactions: Vec<_> = (0..num_transactions)
578        .map(|_| Transaction::new(vec![1_u8; 16]))
579        .collect();
580
581    let timestamp = (round as u64) * 1000 + (own_authority.value() as u64) + rng.gen_range(0..100);
582
583    VerifiedBlock::new_for_test(
584        TestBlock::new(round, own_authority.value() as u32)
585            .set_transactions(transactions)
586            .set_transaction_votes(votes)
587            .set_ancestors(ancestors)
588            .set_timestamp_ms(timestamp)
589            .build(),
590    )
591}
592
593/// Per-round state for iteration.
594#[derive(Clone, Default)]
595struct RoundState {
596    // Total stake of visited blocks in this round.
597    visited_stake: Stake,
598    // Indices of unvisited blocks in this round.
599    unvisited: Vec<usize>,
600}
601
602/// Iterator yielding blocks in constrained random order. Selects from rounds
603/// `completed_round + 1` to `quorum_round + max_step`, simulating arrival with delays.
604pub struct RandomDagIterator<'a> {
605    dag: &'a RandomDag,
606    rng: &'a mut StdRng,
607    quorum_threshold: Stake,
608    max_step: Round,
609    // Highest round where all prior rounds have quorum stake visited.
610    quorum_round: Round,
611    // Highest round where all prior rounds have all blocks visited.
612    completed_round: Round,
613    // State of each round.
614    round_states: Vec<RoundState>,
615    // Number of blocks remaining to visit.
616    num_remaining: usize,
617}
618
619impl<'a> RandomDagIterator<'a> {
620    fn new(dag: &'a RandomDag, rng: &'a mut StdRng, max_step: Round) -> Self {
621        let num_rounds = dag.num_rounds as usize;
622        let committee = &dag.context.committee;
623        let quorum_threshold = committee.quorum_threshold();
624
625        let mut round_states: Vec<RoundState> = vec![RoundState::default(); num_rounds + 1];
626
627        for (idx, block) in dag.blocks.iter().enumerate() {
628            let round = block.round() as usize;
629            round_states[round].unvisited.push(idx);
630        }
631
632        let num_remaining = dag.blocks.len();
633
634        Self {
635            dag,
636            rng,
637            max_step,
638            quorum_round: 0,
639            completed_round: 0,
640            quorum_threshold,
641            round_states,
642            num_remaining,
643        }
644    }
645}
646
647impl Iterator for RandomDagIterator<'_> {
648    type Item = VerifiedBlock;
649
650    /// The high level algorithm is to randomly select a block from unvisited blocks,
651    /// up to quorum_round + max_step round.
652    /// It is possible a sequence of blocks are suspended until their common ancestors
653    /// gets selected and accepted.
654    ///
655    /// An alternative approach is to keep track of selected blocks, and only select blocks without
656    /// missing dependencies. Even though block selection order is also randomized, this approach
657    /// seems to have less test coverage, and was unable to expose errors when incorrect logic was
658    /// introduced to CommitFinalizer.
659    fn next(&mut self) -> Option<Self::Item> {
660        if self.num_remaining == 0 {
661            return None;
662        }
663
664        // Eligible rounds: from first unvisited to quorum_round + max_step.
665        let min_round = self.completed_round as usize + 1;
666        let max_round =
667            ((self.quorum_round + self.max_step) as usize).min(self.round_states.len() - 1);
668        let eligible_rounds = min_round..=max_round;
669
670        let total_candidates: usize = eligible_rounds
671            .clone()
672            .map(|r| self.round_states[r].unvisited.len())
673            .sum();
674
675        if total_candidates == 0 {
676            return None;
677        }
678
679        // Select random candidate by index across eligible rounds.
680        let mut selection = self.rng.gen_range(0..total_candidates);
681        let mut selected_round = 0;
682        let mut selected_pos = 0;
683
684        for r in eligible_rounds {
685            let count = self.round_states[r].unvisited.len();
686            if selection < count {
687                selected_round = r;
688                selected_pos = selection;
689                break;
690            }
691            selection -= count;
692        }
693
694        // Get block index and remove from unvisited.
695        let block_idx = self.round_states[selected_round]
696            .unvisited
697            .swap_remove(selected_pos);
698        let block = self.dag.blocks[block_idx].clone();
699
700        // Update visited stake for this round.
701        let stake = self.dag.context.committee.stake(block.author());
702        self.round_states[selected_round].visited_stake += stake;
703        self.num_remaining -= 1;
704
705        // Advance completed_round while next round has all blocks visited.
706        while self
707            .round_states
708            .get(self.completed_round as usize + 1)
709            .is_some_and(|s| s.unvisited.is_empty())
710        {
711            self.completed_round += 1;
712        }
713
714        // Advance quorum_round while next round has quorum stake visited.
715        while self
716            .round_states
717            .get(self.quorum_round as usize + 1)
718            .is_some_and(|s| s.visited_stake >= self.quorum_threshold)
719        {
720            self.quorum_round += 1;
721        }
722
723        Some(block)
724    }
725}