consensus_core/
commit_test_fixture.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Shared test fixture for commit-related tests.
5//! Used by both commit_finalizer.rs tests and randomized_tests.rs.
6
7use std::collections::{BTreeMap, BTreeSet, VecDeque};
8use std::ops::Bound::Included;
9use std::sync::Arc;
10
11use consensus_config::{AuthorityIndex, Stake};
12use consensus_types::block::{BlockDigest, BlockRef};
13use consensus_types::block::{Round, TransactionIndex};
14use mysten_metrics::monitored_mpsc::unbounded_channel;
15use parking_lot::RwLock;
16use rand::prelude::SliceRandom;
17use rand::{Rng, rngs::StdRng};
18
19use crate::Transaction;
20use crate::block::{BlockTransactionVotes, TestBlock, genesis_blocks};
21use crate::{
22    block::{BlockAPI, VerifiedBlock},
23    block_manager::BlockManager,
24    block_verifier::NoopBlockVerifier,
25    commit::{CommittedSubDag, DecidedLeader},
26    commit_finalizer::CommitFinalizer,
27    context::Context,
28    dag_state::DagState,
29    leader_schedule::{LeaderSchedule, LeaderSwapTable},
30    linearizer::Linearizer,
31    storage::mem_store::MemStore,
32    transaction_certifier::TransactionCertifier,
33    universal_committer::{
34        UniversalCommitter, universal_committer_builder::UniversalCommitterBuilder,
35    },
36};
37
38/// A test fixture that provides all the components needed for testing commit processing,
39/// similar to the actual logic in Core::try_commit() and CommitFinalizer::run().
40pub struct CommitTestFixture {
41    pub context: Arc<Context>,
42    pub linearizer: Linearizer,
43    pub transaction_certifier: TransactionCertifier,
44    pub commit_finalizer: CommitFinalizer,
45
46    dag_state: Arc<RwLock<DagState>>,
47    block_manager: BlockManager,
48    committer: UniversalCommitter,
49}
50
51impl CommitTestFixture {
52    /// Creates a new CommitTestFixture from a context.
53    pub fn new(context: Arc<Context>) -> Self {
54        let leader_schedule = Arc::new(LeaderSchedule::new(
55            context.clone(),
56            LeaderSwapTable::default(),
57        ));
58        let dag_state = Arc::new(RwLock::new(DagState::new(
59            context.clone(),
60            Arc::new(MemStore::new()),
61        )));
62
63        // Create committer with pipelining and only 1 leader per leader round
64        let committer =
65            UniversalCommitterBuilder::new(context.clone(), leader_schedule, dag_state.clone())
66                .with_pipeline(true)
67                .build();
68
69        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
70
71        let linearizer = Linearizer::new(context.clone(), dag_state.clone());
72        let (blocks_sender, _blocks_receiver) = unbounded_channel("consensus_block_output");
73        let transaction_certifier = TransactionCertifier::new(
74            context.clone(),
75            Arc::new(NoopBlockVerifier {}),
76            dag_state.clone(),
77            blocks_sender,
78        );
79        let (commit_sender, _commit_receiver) = unbounded_channel("consensus_commit_output");
80        let commit_finalizer = CommitFinalizer::new(
81            context.clone(),
82            dag_state.clone(),
83            transaction_certifier.clone(),
84            commit_sender,
85        );
86
87        Self {
88            context,
89            linearizer,
90            transaction_certifier,
91            commit_finalizer,
92            dag_state,
93            block_manager,
94            committer,
95        }
96    }
97
98    /// Creates a new CommitTestFixture with more options.
99    pub fn with_options(
100        num_authorities: usize,
101        authority_index: u32,
102        gc_depth: Option<u32>,
103    ) -> Self {
104        Self::new(Self::context_with_options(
105            num_authorities,
106            authority_index,
107            gc_depth,
108        ))
109    }
110
111    pub fn context_with_options(
112        num_authorities: usize,
113        authority_index: u32,
114        gc_depth: Option<u32>,
115    ) -> Arc<Context> {
116        let (mut context, _keys) = Context::new_for_test(num_authorities);
117        if let Some(gc_depth) = gc_depth {
118            context.protocol_config.set_gc_depth_for_testing(gc_depth);
119        }
120        // Skipping equivocation validation is necessary when testing with equivocators.
121        // Also it is ok when only testing the commit code path.
122        context.parameters.internal.skip_equivocation_validation = true;
123        Arc::new(context.with_authority_index(AuthorityIndex::new_for_test(authority_index)))
124    }
125
126    // Adds the blocks to the transaction certifier and then tries to accept them via BlockManager.
127    /// This registers the blocks for reject vote tracking (with no reject votes).
128    pub fn try_accept_blocks(&mut self, blocks: Vec<VerifiedBlock>) {
129        self.transaction_certifier
130            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
131        self.block_manager.try_accept_blocks(blocks);
132    }
133
134    /// Adds blocks to the transaction certifier and dag state.
135    /// This registers the blocks for reject vote tracking (with no reject votes).
136    pub fn add_blocks(&self, blocks: Vec<VerifiedBlock>) {
137        let blocks_and_votes = blocks.iter().map(|b| (b.clone(), vec![])).collect();
138        self.transaction_certifier
139            .add_voted_blocks(blocks_and_votes);
140        self.dag_state.write().accept_blocks(blocks);
141    }
142
143    pub fn add_blocks_with_own_votes(
144        &self,
145        blocks_and_votes: Vec<(VerifiedBlock, Vec<TransactionIndex>)>,
146    ) {
147        let blocks = blocks_and_votes.iter().map(|(b, _)| b.clone()).collect();
148        self.transaction_certifier
149            .add_voted_blocks(blocks_and_votes);
150        self.dag_state.write().accept_blocks(blocks);
151    }
152
153    /// Checks if the block manager has no suspended blocks.
154    #[cfg(test)]
155    pub(crate) fn has_no_suspended_blocks(&self) -> bool {
156        self.block_manager.is_empty()
157    }
158
159    /// Tries to decide leaders, process and finalize commits, returning finalized commits
160    /// and the updated last_decided slot.
161    pub async fn try_commit(
162        &mut self,
163        last_decided: crate::block::Slot,
164    ) -> (Vec<CommittedSubDag>, crate::block::Slot) {
165        let sequence = self.committer.try_decide(last_decided);
166        let new_last_decided = sequence
167            .last()
168            .map(|leader| leader.slot())
169            .unwrap_or(last_decided);
170        let finalized = self.process_commits(sequence).await;
171        (finalized, new_last_decided)
172    }
173
174    /// Process decided leaders through linearizer and commit finalizer,
175    /// similar to Core::try_commit() and CommitFinalizer::run().
176    ///
177    /// This extracts leader blocks from DecidedLeader::Commit, creates CommittedSubDags
178    /// via the linearizer, and processes them through the commit finalizer.
179    pub(crate) async fn process_commits(
180        &mut self,
181        sequence: Vec<DecidedLeader>,
182    ) -> Vec<CommittedSubDag> {
183        // Extract leader blocks from DecidedLeader::Commit (skip Skip decisions)
184        let leaders: Vec<VerifiedBlock> = sequence
185            .into_iter()
186            .filter_map(|d| match d {
187                DecidedLeader::Commit(block, _) => Some(block),
188                DecidedLeader::Skip(_) => None,
189            })
190            .collect();
191
192        if leaders.is_empty() {
193            return vec![];
194        }
195
196        // Use linearizer to create CommittedSubDag
197        let committed_sub_dags = self.linearizer.handle_commit(leaders);
198
199        // After handle_commit(), the GC round is updated. We need to unsuspend any blocks that were
200        // suspended because of missing ancestors that are now GC'ed.
201        self.block_manager
202            .try_unsuspend_blocks_for_latest_gc_round();
203
204        // Process through commit finalizer
205        let mut finalized_commits = vec![];
206        for mut subdag in committed_sub_dags {
207            subdag.decided_with_local_blocks = true;
208            let finalized = self.commit_finalizer.process_commit(subdag).await;
209            finalized_commits.extend(finalized);
210        }
211
212        finalized_commits
213    }
214}
215
216/// Compare commit sequences across all runs, asserting they are identical.
217/// Returns the shortest commit sequence for additional assertions if needed.
218pub fn assert_commit_sequences_match(
219    commit_sequences: Vec<Vec<CommittedSubDag>>,
220) -> Vec<CommittedSubDag> {
221    let (shortest_idx, shortest_sequence) = commit_sequences
222        .iter()
223        .enumerate()
224        .min_by_key(|(_, seq)| seq.len())
225        .expect("commit_sequences should not be empty");
226
227    for (run, commit_sequence) in commit_sequences.iter().enumerate() {
228        // Since INDIRECT_REJECT_DEPTH is 3, the maximum number of commits buffered in CommitFinalizer is 3.
229        // And because of the direct finalization optimization, it might happen the last 3 commits are pending in
230        // one run but all finalized in another.
231        assert!(
232            commit_sequence.len() <= shortest_sequence.len() + 3,
233            "Commit sequence at run {run} is more than 3 commits longer than shortest (run {shortest_idx}): {} vs {}",
234            commit_sequence.len(),
235            shortest_sequence.len()
236        );
237
238        for (commit_index, (c1, c2)) in commit_sequence
239            .iter()
240            .zip(shortest_sequence.iter())
241            .enumerate()
242        {
243            assert_eq!(
244                c1.leader, c2.leader,
245                "Leader mismatch at run {run} commit {commit_index}"
246            );
247            assert_eq!(
248                c1.commit_ref, c2.commit_ref,
249                "Commit sequence mismatch at run {run} commit {commit_index}"
250            );
251            assert_eq!(
252                c1.rejected_transactions_by_block, c2.rejected_transactions_by_block,
253                "Rejected transactions mismatch at run {run} commit {commit_index}"
254            );
255        }
256    }
257
258    let mut total_transactions = 0;
259    let mut rejected_transactions = 0;
260    let mut reject_votes = 0;
261    let mut blocks = 4;
262    for commit in shortest_sequence.iter() {
263        total_transactions += commit
264            .blocks
265            .iter()
266            .map(|block| block.transactions().len())
267            .sum::<usize>();
268        rejected_transactions += commit
269            .rejected_transactions_by_block
270            .values()
271            .map(|transactions| transactions.len())
272            .sum::<usize>();
273        reject_votes += commit
274            .blocks
275            .iter()
276            .map(|block| block.transaction_votes().len())
277            .sum::<usize>();
278        blocks += commit.blocks.len();
279    }
280
281    tracing::info!(
282        "Finished comparing commit sequences. Commits: {}, Blocks: {}, Total transactions: {}, Rejected transactions: {}, Reject votes: {}",
283        shortest_sequence.len(),
284        blocks,
285        total_transactions,
286        rejected_transactions,
287        reject_votes
288    );
289
290    shortest_sequence.clone()
291}
292
293// ---- RandomDag, RandomDagConfig and RandomDagIterator ----
294
295/// Configuration for generating a randomized DAG.
296pub struct RandomDagConfig {
297    // Number of distinct authorities creating blocks.
298    pub num_authorities: usize,
299    // Number of rounds to generate.
300    pub num_rounds: Round,
301    // Number of transactions per block.
302    pub num_transactions: u32,
303    // Percentage chance of transactions to randomly reject.
304    pub reject_percentage: u8,
305    // Each element specifies the authority index, and the number of equivocators
306    // that are acting under this authority.
307    pub equivocators: Vec<(AuthorityIndex, u16)>,
308}
309
310/// A randomly generated DAG for testing commit patterns with reject votes.
311pub struct RandomDag {
312    context: Arc<Context>,
313    pub blocks: Vec<VerifiedBlock>,
314    num_rounds: Round,
315}
316
317impl RandomDag {
318    /// Creates a new RandomDag with generated blocks containing transactions and reject votes.
319    pub fn new(context: Arc<Context>, rng: &mut StdRng, config: RandomDagConfig) -> Self {
320        let RandomDagConfig {
321            num_authorities: _,
322            num_rounds,
323            num_transactions,
324            reject_percentage,
325            equivocators,
326        } = config;
327
328        let committee = &context.committee;
329        let quorum_threshold = committee.quorum_threshold();
330        let total_stake = committee.total_stake();
331
332        // Create instance ID for each authority and equivocators.
333        let mut instances: Vec<InstanceID> = committee.authorities().map(|(a, _)| (a, 0)).collect();
334        for (authority, num_equivocators) in equivocators {
335            for i in 1..=num_equivocators {
336                instances.push((authority, i));
337            }
338        }
339
340        let genesis_blocks = genesis_blocks(&context)
341            .into_iter()
342            .map(|b| (b.author(), b))
343            .collect::<BTreeMap<AuthorityIndex, VerifiedBlock>>();
344        let last_round_blocks: Vec<VerifiedBlock> = genesis_blocks.values().cloned().collect();
345
346        // Store all blocks for lookup and range search.
347        let mut all_blocks: BTreeMap<BlockRef, VerifiedBlock> = BTreeMap::new();
348        // Initialize with genesis blocks.
349        for block in &last_round_blocks {
350            all_blocks.insert(block.reference(), block.clone());
351        }
352
353        // Track the latest block per instance. Equivocators start from the same genesis blocks per authority.
354        let mut latest_blocks = instances
355            .iter()
356            .map(|&(a, i)| {
357                let b = genesis_blocks.get(&a).unwrap();
358                ((a, i), b.clone())
359            })
360            .collect::<BTreeMap<InstanceID, VerifiedBlock>>();
361
362        // Track included blocks per instance (simulates link_causal_history).
363        let mut included_refs = BTreeMap::<InstanceID, BTreeSet<BlockRef>>::new();
364
365        for r in 1..=num_rounds {
366            // Select random quorum-or-more stake to produce blocks this round.
367            let target_stake = rng.gen_range(quorum_threshold..=total_stake);
368
369            // Select random instances to produce blocks this round.
370            // Equivocations are allowed when multiple instances from the same authority propose at
371            // the same slot.
372            let mut proposers = instances.clone();
373            proposers.shuffle(rng);
374            let mut selected_stake = 0;
375            // Ensure stake across equivocations are counted only once.
376            let mut selected_authorities = vec![false; committee.size()];
377            let selected_proposers: Vec<_> = proposers
378                .into_iter()
379                .take_while(|instance| {
380                    if selected_stake >= target_stake {
381                        return false;
382                    }
383                    if !selected_authorities[instance.0.value()] {
384                        selected_authorities[instance.0.value()] = true;
385                        selected_stake += committee.stake(instance.0);
386                    }
387                    true
388                })
389                .collect();
390
391            let mut current_round_blocks = Vec::new();
392            for instance_id in selected_proposers {
393                let block = build_block_for_instance(
394                    &context,
395                    &instances,
396                    rng,
397                    r,
398                    instance_id,
399                    num_transactions,
400                    reject_percentage,
401                    &all_blocks,
402                    &mut latest_blocks,
403                    &mut included_refs,
404                );
405                current_round_blocks.push((instance_id, block));
406            }
407
408            // Update state with current round blocks.
409            for (instance_id, block) in current_round_blocks {
410                all_blocks.insert(block.reference(), block.clone());
411                latest_blocks.insert(instance_id, block);
412            }
413        }
414
415        RandomDag {
416            context,
417            blocks: all_blocks.values().cloned().collect(),
418            num_rounds,
419        }
420    }
421
422    /// Creates a RandomDag from existing blocks.
423    pub fn from_blocks(context: Arc<Context>, blocks: Vec<VerifiedBlock>) -> Self {
424        let num_rounds = blocks.iter().map(|b| b.round()).max().unwrap_or(0);
425        RandomDag {
426            context,
427            blocks,
428            num_rounds,
429        }
430    }
431
432    /// Creates an iterator yielding blocks in constrained random order.
433    pub fn random_iter<'a>(
434        &'a self,
435        rng: &'a mut StdRng,
436        max_step: Round,
437    ) -> RandomDagIterator<'a> {
438        RandomDagIterator::new(self, rng, max_step)
439    }
440}
441
442// Identifies a consensus instance, by its authority index and another index to differentiate between equivocators.
443type InstanceID = (AuthorityIndex, u16);
444
445/// Builds a single block for the given consensus instance at the specified round.
446fn build_block_for_instance(
447    context: &Arc<Context>,
448    instances: &[InstanceID],
449    rng: &mut StdRng,
450    round: Round,
451    own_instance: InstanceID,
452    num_transactions: u32,
453    reject_percentage: u8,
454    all_blocks: &BTreeMap<BlockRef, VerifiedBlock>,
455    latest_blocks: &mut BTreeMap<InstanceID, VerifiedBlock>,
456    included_refs: &mut BTreeMap<InstanceID, BTreeSet<BlockRef>>,
457) -> VerifiedBlock {
458    let committee = &context.committee;
459    let quorum_threshold = committee.quorum_threshold();
460    let own_authority = own_instance.0;
461
462    // Select blocks from the previous round until quorum stake is reached.
463    let prev_round = round - 1;
464    let mut prev_round_blocks: Vec<_> = all_blocks
465        .range((
466            Included(BlockRef::new(
467                prev_round,
468                AuthorityIndex::MIN,
469                BlockDigest::MIN,
470            )),
471            Included(BlockRef::new(
472                prev_round,
473                AuthorityIndex::MAX,
474                BlockDigest::MAX,
475            )),
476        ))
477        .map(|(_, b)| b)
478        .cloned()
479        .collect();
480    prev_round_blocks.shuffle(rng);
481    let mut parent_stake: Stake = 0;
482    // Ensure only one block gets selected per authority, regardless of equivocations.
483    let mut selected_authorities = vec![false; committee.size()];
484    let quorum_parents: Vec<_> = prev_round_blocks
485        .into_iter()
486        .filter_map(|b| {
487            if parent_stake >= quorum_threshold {
488                return None;
489            }
490            if selected_authorities[b.author().value()] {
491                return None;
492            }
493            selected_authorities[b.author().value()] = true;
494            parent_stake += committee.stake(b.author());
495            Some(b)
496        })
497        .collect();
498
499    // Find so far unselected instances.
500    let mut unselected_instances: Vec<_> = instances
501        .iter()
502        .filter(|(authority, _)| !selected_authorities[authority.value()])
503        .cloned()
504        .collect();
505
506    // Randomly select extra blocks to link to among ancestors.
507    unselected_instances.shuffle(rng);
508    // Use min of two uniform samples to bias toward fewer additional ancestors
509    // while maintaining non-zero probability for all counts.
510    let extra_count = rng
511        .gen_range(0..=unselected_instances.len())
512        .min(rng.gen_range(0..=unselected_instances.len()));
513    let additional_ancestor_blocks: Vec<_> = unselected_instances[0..extra_count]
514        .iter()
515        .filter_map(|&(authority, instance)| {
516            if selected_authorities[authority.value()] {
517                return None;
518            }
519            let block = latest_blocks.get(&(authority, instance))?;
520            assert!(
521                block.round() < round,
522                "latest_blocks should only contain blocks from previous rounds"
523            );
524            selected_authorities[authority.value()] = true;
525            Some(block.clone())
526        })
527        .collect();
528
529    // Combine ancestors: quorum parents + extra ancestors from unselected authorities + own latest block if necessary.
530    let mut ancestor_blocks = quorum_parents;
531    ancestor_blocks.extend(additional_ancestor_blocks);
532    if !ancestor_blocks.iter().any(|b| b.author() == own_authority) {
533        ancestor_blocks.push(latest_blocks[&own_instance].clone());
534    }
535    let ancestors: Vec<_> = ancestor_blocks.iter().map(|b| b.reference()).collect();
536
537    // Find newly connected blocks via BFS (similar to link_causal_history).
538    let mut newly_connected = Vec::new();
539    let mut queue = VecDeque::from_iter(ancestors.iter().copied());
540    while let Some(block_ref) = queue.pop_front() {
541        if block_ref.round == 0 {
542            continue; // Skip genesis blocks.
543        }
544        if included_refs
545            .entry(own_instance)
546            .or_default()
547            .contains(&block_ref)
548        {
549            continue; // Already included.
550        }
551        included_refs
552            .entry(own_instance)
553            .or_default()
554            .insert(block_ref);
555        newly_connected.push(block_ref);
556        // Traverse ancestors.
557        if let Some(block) = all_blocks.get(&block_ref) {
558            queue.extend(block.ancestors().iter().cloned());
559        }
560    }
561
562    // Generate random reject votes for newly connected blocks only.
563    let votes: Vec<_> = newly_connected
564        .iter()
565        .filter(|_| reject_percentage > 0)
566        .filter_map(|&block_ref| {
567            let rejects: Vec<_> = (0..num_transactions)
568                .filter(|_| rng.gen_range(0..100) < reject_percentage)
569                .map(|idx| idx as TransactionIndex)
570                .collect();
571            (!rejects.is_empty()).then_some(BlockTransactionVotes { block_ref, rejects })
572        })
573        .collect();
574
575    let transactions: Vec<_> = (0..num_transactions)
576        .map(|_| Transaction::new(vec![1_u8; 16]))
577        .collect();
578
579    let timestamp = (round as u64) * 1000 + (own_authority.value() as u64) + rng.gen_range(0..100);
580
581    VerifiedBlock::new_for_test(
582        TestBlock::new(round, own_authority.value() as u32)
583            .set_transactions(transactions)
584            .set_transaction_votes(votes)
585            .set_ancestors(ancestors)
586            .set_timestamp_ms(timestamp)
587            .build(),
588    )
589}
590
591/// Per-round state for iteration.
592#[derive(Clone, Default)]
593struct RoundState {
594    // Total stake of visited blocks in this round.
595    visited_stake: Stake,
596    // Indices of unvisited blocks in this round.
597    unvisited: Vec<usize>,
598}
599
600/// Iterator yielding blocks in constrained random order. Selects from rounds
601/// `completed_round + 1` to `quorum_round + max_step`, simulating arrival with delays.
602pub struct RandomDagIterator<'a> {
603    dag: &'a RandomDag,
604    rng: &'a mut StdRng,
605    quorum_threshold: Stake,
606    max_step: Round,
607    // Highest round where all prior rounds have quorum stake visited.
608    quorum_round: Round,
609    // Highest round where all prior rounds have all blocks visited.
610    completed_round: Round,
611    // State of each round.
612    round_states: Vec<RoundState>,
613    // Number of blocks remaining to visit.
614    num_remaining: usize,
615}
616
617impl<'a> RandomDagIterator<'a> {
618    fn new(dag: &'a RandomDag, rng: &'a mut StdRng, max_step: Round) -> Self {
619        let num_rounds = dag.num_rounds as usize;
620        let committee = &dag.context.committee;
621        let quorum_threshold = committee.quorum_threshold();
622
623        let mut round_states: Vec<RoundState> = vec![RoundState::default(); num_rounds + 1];
624
625        for (idx, block) in dag.blocks.iter().enumerate() {
626            let round = block.round() as usize;
627            round_states[round].unvisited.push(idx);
628        }
629
630        let num_remaining = dag.blocks.len();
631
632        Self {
633            dag,
634            rng,
635            max_step,
636            quorum_round: 0,
637            completed_round: 0,
638            quorum_threshold,
639            round_states,
640            num_remaining,
641        }
642    }
643}
644
645impl Iterator for RandomDagIterator<'_> {
646    type Item = VerifiedBlock;
647
648    /// The high level algorithm is to randomly select a block from unvisited blocks,
649    /// up to quorum_round + max_step round.
650    /// It is possible a sequence of blocks are suspended until their common ancestors
651    /// gets selected and accepted.
652    ///
653    /// An alternative approach is to keep track of selected blocks, and only select blocks without
654    /// missing dependencies. Even though block selection order is also randomized, this approach
655    /// seems to have less test coverage, and was unable to expose errors when incorrect logic was
656    /// introduced to CommitFinalizer.
657    fn next(&mut self) -> Option<Self::Item> {
658        if self.num_remaining == 0 {
659            return None;
660        }
661
662        // Eligible rounds: from first unvisited to quorum_round + max_step.
663        let min_round = self.completed_round as usize + 1;
664        let max_round =
665            ((self.quorum_round + self.max_step) as usize).min(self.round_states.len() - 1);
666        let eligible_rounds = min_round..=max_round;
667
668        let total_candidates: usize = eligible_rounds
669            .clone()
670            .map(|r| self.round_states[r].unvisited.len())
671            .sum();
672
673        if total_candidates == 0 {
674            return None;
675        }
676
677        // Select random candidate by index across eligible rounds.
678        let mut selection = self.rng.gen_range(0..total_candidates);
679        let mut selected_round = 0;
680        let mut selected_pos = 0;
681
682        for r in eligible_rounds {
683            let count = self.round_states[r].unvisited.len();
684            if selection < count {
685                selected_round = r;
686                selected_pos = selection;
687                break;
688            }
689            selection -= count;
690        }
691
692        // Get block index and remove from unvisited.
693        let block_idx = self.round_states[selected_round]
694            .unvisited
695            .swap_remove(selected_pos);
696        let block = self.dag.blocks[block_idx].clone();
697
698        // Update visited stake for this round.
699        let stake = self.dag.context.committee.stake(block.author());
700        self.round_states[selected_round].visited_stake += stake;
701        self.num_remaining -= 1;
702
703        // Advance completed_round while next round has all blocks visited.
704        while self
705            .round_states
706            .get(self.completed_round as usize + 1)
707            .is_some_and(|s| s.unvisited.is_empty())
708        {
709            self.completed_round += 1;
710        }
711
712        // Advance quorum_round while next round has quorum stake visited.
713        while self
714            .round_states
715            .get(self.quorum_round as usize + 1)
716            .is_some_and(|s| s.visited_stake >= self.quorum_threshold)
717        {
718            self.quorum_round += 1;
719        }
720
721        Some(block)
722    }
723}