1use std::collections::{BTreeMap, BTreeSet, VecDeque};
8use std::ops::Bound::Included;
9use std::sync::Arc;
10
11use consensus_config::{AuthorityIndex, Stake};
12use consensus_types::block::{BlockDigest, BlockRef};
13use consensus_types::block::{Round, TransactionIndex};
14use mysten_metrics::monitored_mpsc::unbounded_channel;
15use parking_lot::RwLock;
16use rand::prelude::SliceRandom;
17use rand::{Rng, rngs::StdRng};
18
19use crate::Transaction;
20use crate::block::{BlockTransactionVotes, TestBlock, genesis_blocks};
21use crate::{
22 block::{BlockAPI, VerifiedBlock},
23 block_manager::BlockManager,
24 block_verifier::NoopBlockVerifier,
25 commit::{CommittedSubDag, DecidedLeader},
26 commit_finalizer::CommitFinalizer,
27 context::Context,
28 dag_state::DagState,
29 leader_schedule::{LeaderSchedule, LeaderSwapTable},
30 linearizer::Linearizer,
31 storage::mem_store::MemStore,
32 transaction_certifier::TransactionCertifier,
33 universal_committer::{
34 UniversalCommitter, universal_committer_builder::UniversalCommitterBuilder,
35 },
36};
37
38pub struct CommitTestFixture {
41 pub context: Arc<Context>,
42 pub linearizer: Linearizer,
43 pub transaction_certifier: TransactionCertifier,
44 pub commit_finalizer: CommitFinalizer,
45
46 dag_state: Arc<RwLock<DagState>>,
47 block_manager: BlockManager,
48 committer: UniversalCommitter,
49}
50
51impl CommitTestFixture {
52 pub fn new(context: Arc<Context>) -> Self {
54 let leader_schedule = Arc::new(LeaderSchedule::new(
55 context.clone(),
56 LeaderSwapTable::default(),
57 ));
58 let dag_state = Arc::new(RwLock::new(DagState::new(
59 context.clone(),
60 Arc::new(MemStore::new()),
61 )));
62
63 let committer =
65 UniversalCommitterBuilder::new(context.clone(), leader_schedule, dag_state.clone())
66 .with_pipeline(true)
67 .build();
68
69 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
70
71 let linearizer = Linearizer::new(context.clone(), dag_state.clone());
72 let (blocks_sender, _blocks_receiver) = unbounded_channel("consensus_block_output");
73 let transaction_certifier = TransactionCertifier::new(
74 context.clone(),
75 Arc::new(NoopBlockVerifier {}),
76 dag_state.clone(),
77 blocks_sender,
78 );
79 let (commit_sender, _commit_receiver) = unbounded_channel("consensus_commit_output");
80 let commit_finalizer = CommitFinalizer::new(
81 context.clone(),
82 dag_state.clone(),
83 transaction_certifier.clone(),
84 commit_sender,
85 );
86
87 Self {
88 context,
89 linearizer,
90 transaction_certifier,
91 commit_finalizer,
92 dag_state,
93 block_manager,
94 committer,
95 }
96 }
97
98 pub fn with_options(
100 num_authorities: usize,
101 authority_index: u32,
102 gc_depth: Option<u32>,
103 ) -> Self {
104 Self::new(Self::context_with_options(
105 num_authorities,
106 authority_index,
107 gc_depth,
108 ))
109 }
110
111 pub fn context_with_options(
112 num_authorities: usize,
113 authority_index: u32,
114 gc_depth: Option<u32>,
115 ) -> Arc<Context> {
116 let (mut context, _keys) = Context::new_for_test(num_authorities);
117 if let Some(gc_depth) = gc_depth {
118 context.protocol_config.set_gc_depth_for_testing(gc_depth);
119 }
120 context.parameters.internal.skip_equivocation_validation = true;
123 Arc::new(context.with_authority_index(AuthorityIndex::new_for_test(authority_index)))
124 }
125
126 pub fn try_accept_blocks(&mut self, blocks: Vec<VerifiedBlock>) {
129 self.transaction_certifier
130 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
131 self.block_manager.try_accept_blocks(blocks);
132 }
133
134 pub fn add_blocks(&self, blocks: Vec<VerifiedBlock>) {
137 let blocks_and_votes = blocks.iter().map(|b| (b.clone(), vec![])).collect();
138 self.transaction_certifier
139 .add_voted_blocks(blocks_and_votes);
140 self.dag_state.write().accept_blocks(blocks);
141 }
142
143 pub fn add_blocks_with_own_votes(
144 &self,
145 blocks_and_votes: Vec<(VerifiedBlock, Vec<TransactionIndex>)>,
146 ) {
147 let blocks = blocks_and_votes.iter().map(|(b, _)| b.clone()).collect();
148 self.transaction_certifier
149 .add_voted_blocks(blocks_and_votes);
150 self.dag_state.write().accept_blocks(blocks);
151 }
152
153 #[cfg(test)]
155 pub(crate) fn has_no_suspended_blocks(&self) -> bool {
156 self.block_manager.is_empty()
157 }
158
159 pub async fn try_commit(
162 &mut self,
163 last_decided: crate::block::Slot,
164 ) -> (Vec<CommittedSubDag>, crate::block::Slot) {
165 let sequence = self.committer.try_decide(last_decided);
166 let new_last_decided = sequence
167 .last()
168 .map(|leader| leader.slot())
169 .unwrap_or(last_decided);
170 let finalized = self.process_commits(sequence).await;
171 (finalized, new_last_decided)
172 }
173
174 pub(crate) async fn process_commits(
180 &mut self,
181 sequence: Vec<DecidedLeader>,
182 ) -> Vec<CommittedSubDag> {
183 let leaders: Vec<VerifiedBlock> = sequence
185 .into_iter()
186 .filter_map(|d| match d {
187 DecidedLeader::Commit(block, _) => Some(block),
188 DecidedLeader::Skip(_) => None,
189 })
190 .collect();
191
192 if leaders.is_empty() {
193 return vec![];
194 }
195
196 let committed_sub_dags = self.linearizer.handle_commit(leaders);
198
199 self.block_manager
202 .try_unsuspend_blocks_for_latest_gc_round();
203
204 let mut finalized_commits = vec![];
206 for mut subdag in committed_sub_dags {
207 subdag.decided_with_local_blocks = true;
208 let finalized = self.commit_finalizer.process_commit(subdag).await;
209 finalized_commits.extend(finalized);
210 }
211
212 finalized_commits
213 }
214}
215
216pub fn assert_commit_sequences_match(
219 commit_sequences: Vec<Vec<CommittedSubDag>>,
220) -> Vec<CommittedSubDag> {
221 let (shortest_idx, shortest_sequence) = commit_sequences
222 .iter()
223 .enumerate()
224 .min_by_key(|(_, seq)| seq.len())
225 .expect("commit_sequences should not be empty");
226
227 for (run, commit_sequence) in commit_sequences.iter().enumerate() {
228 assert!(
232 commit_sequence.len() <= shortest_sequence.len() + 3,
233 "Commit sequence at run {run} is more than 3 commits longer than shortest (run {shortest_idx}): {} vs {}",
234 commit_sequence.len(),
235 shortest_sequence.len()
236 );
237
238 for (commit_index, (c1, c2)) in commit_sequence
239 .iter()
240 .zip(shortest_sequence.iter())
241 .enumerate()
242 {
243 assert_eq!(
244 c1.leader, c2.leader,
245 "Leader mismatch at run {run} commit {commit_index}"
246 );
247 assert_eq!(
248 c1.commit_ref, c2.commit_ref,
249 "Commit sequence mismatch at run {run} commit {commit_index}"
250 );
251 assert_eq!(
252 c1.rejected_transactions_by_block, c2.rejected_transactions_by_block,
253 "Rejected transactions mismatch at run {run} commit {commit_index}"
254 );
255 }
256 }
257
258 let mut total_transactions = 0;
259 let mut rejected_transactions = 0;
260 let mut reject_votes = 0;
261 let mut blocks = 4;
262 for commit in shortest_sequence.iter() {
263 total_transactions += commit
264 .blocks
265 .iter()
266 .map(|block| block.transactions().len())
267 .sum::<usize>();
268 rejected_transactions += commit
269 .rejected_transactions_by_block
270 .values()
271 .map(|transactions| transactions.len())
272 .sum::<usize>();
273 reject_votes += commit
274 .blocks
275 .iter()
276 .map(|block| block.transaction_votes().len())
277 .sum::<usize>();
278 blocks += commit.blocks.len();
279 }
280
281 tracing::info!(
282 "Finished comparing commit sequences. Commits: {}, Blocks: {}, Total transactions: {}, Rejected transactions: {}, Reject votes: {}",
283 shortest_sequence.len(),
284 blocks,
285 total_transactions,
286 rejected_transactions,
287 reject_votes
288 );
289
290 shortest_sequence.clone()
291}
292
293pub struct RandomDagConfig {
297 pub num_authorities: usize,
299 pub num_rounds: Round,
301 pub num_transactions: u32,
303 pub reject_percentage: u8,
305 pub equivocators: Vec<(AuthorityIndex, u16)>,
308}
309
310pub struct RandomDag {
312 context: Arc<Context>,
313 pub blocks: Vec<VerifiedBlock>,
314 num_rounds: Round,
315}
316
317impl RandomDag {
318 pub fn new(context: Arc<Context>, rng: &mut StdRng, config: RandomDagConfig) -> Self {
320 let RandomDagConfig {
321 num_authorities: _,
322 num_rounds,
323 num_transactions,
324 reject_percentage,
325 equivocators,
326 } = config;
327
328 let committee = &context.committee;
329 let quorum_threshold = committee.quorum_threshold();
330 let total_stake = committee.total_stake();
331
332 let mut instances: Vec<InstanceID> = committee.authorities().map(|(a, _)| (a, 0)).collect();
334 for (authority, num_equivocators) in equivocators {
335 for i in 1..=num_equivocators {
336 instances.push((authority, i));
337 }
338 }
339
340 let genesis_blocks = genesis_blocks(&context)
341 .into_iter()
342 .map(|b| (b.author(), b))
343 .collect::<BTreeMap<AuthorityIndex, VerifiedBlock>>();
344 let last_round_blocks: Vec<VerifiedBlock> = genesis_blocks.values().cloned().collect();
345
346 let mut all_blocks: BTreeMap<BlockRef, VerifiedBlock> = BTreeMap::new();
348 for block in &last_round_blocks {
350 all_blocks.insert(block.reference(), block.clone());
351 }
352
353 let mut latest_blocks = instances
355 .iter()
356 .map(|&(a, i)| {
357 let b = genesis_blocks.get(&a).unwrap();
358 ((a, i), b.clone())
359 })
360 .collect::<BTreeMap<InstanceID, VerifiedBlock>>();
361
362 let mut included_refs = BTreeMap::<InstanceID, BTreeSet<BlockRef>>::new();
364
365 for r in 1..=num_rounds {
366 let target_stake = rng.gen_range(quorum_threshold..=total_stake);
368
369 let mut proposers = instances.clone();
373 proposers.shuffle(rng);
374 let mut selected_stake = 0;
375 let mut selected_authorities = vec![false; committee.size()];
377 let selected_proposers: Vec<_> = proposers
378 .into_iter()
379 .take_while(|instance| {
380 if selected_stake >= target_stake {
381 return false;
382 }
383 if !selected_authorities[instance.0.value()] {
384 selected_authorities[instance.0.value()] = true;
385 selected_stake += committee.stake(instance.0);
386 }
387 true
388 })
389 .collect();
390
391 let mut current_round_blocks = Vec::new();
392 for instance_id in selected_proposers {
393 let block = build_block_for_instance(
394 &context,
395 &instances,
396 rng,
397 r,
398 instance_id,
399 num_transactions,
400 reject_percentage,
401 &all_blocks,
402 &mut latest_blocks,
403 &mut included_refs,
404 );
405 current_round_blocks.push((instance_id, block));
406 }
407
408 for (instance_id, block) in current_round_blocks {
410 all_blocks.insert(block.reference(), block.clone());
411 latest_blocks.insert(instance_id, block);
412 }
413 }
414
415 RandomDag {
416 context,
417 blocks: all_blocks.values().cloned().collect(),
418 num_rounds,
419 }
420 }
421
422 pub fn from_blocks(context: Arc<Context>, blocks: Vec<VerifiedBlock>) -> Self {
424 let num_rounds = blocks.iter().map(|b| b.round()).max().unwrap_or(0);
425 RandomDag {
426 context,
427 blocks,
428 num_rounds,
429 }
430 }
431
432 pub fn random_iter<'a>(
434 &'a self,
435 rng: &'a mut StdRng,
436 max_step: Round,
437 ) -> RandomDagIterator<'a> {
438 RandomDagIterator::new(self, rng, max_step)
439 }
440}
441
442type InstanceID = (AuthorityIndex, u16);
444
445fn build_block_for_instance(
447 context: &Arc<Context>,
448 instances: &[InstanceID],
449 rng: &mut StdRng,
450 round: Round,
451 own_instance: InstanceID,
452 num_transactions: u32,
453 reject_percentage: u8,
454 all_blocks: &BTreeMap<BlockRef, VerifiedBlock>,
455 latest_blocks: &mut BTreeMap<InstanceID, VerifiedBlock>,
456 included_refs: &mut BTreeMap<InstanceID, BTreeSet<BlockRef>>,
457) -> VerifiedBlock {
458 let committee = &context.committee;
459 let quorum_threshold = committee.quorum_threshold();
460 let own_authority = own_instance.0;
461
462 let prev_round = round - 1;
464 let mut prev_round_blocks: Vec<_> = all_blocks
465 .range((
466 Included(BlockRef::new(
467 prev_round,
468 AuthorityIndex::MIN,
469 BlockDigest::MIN,
470 )),
471 Included(BlockRef::new(
472 prev_round,
473 AuthorityIndex::MAX,
474 BlockDigest::MAX,
475 )),
476 ))
477 .map(|(_, b)| b)
478 .cloned()
479 .collect();
480 prev_round_blocks.shuffle(rng);
481 let mut parent_stake: Stake = 0;
482 let mut selected_authorities = vec![false; committee.size()];
484 let quorum_parents: Vec<_> = prev_round_blocks
485 .into_iter()
486 .filter_map(|b| {
487 if parent_stake >= quorum_threshold {
488 return None;
489 }
490 if selected_authorities[b.author().value()] {
491 return None;
492 }
493 selected_authorities[b.author().value()] = true;
494 parent_stake += committee.stake(b.author());
495 Some(b)
496 })
497 .collect();
498
499 let mut unselected_instances: Vec<_> = instances
501 .iter()
502 .filter(|(authority, _)| !selected_authorities[authority.value()])
503 .cloned()
504 .collect();
505
506 unselected_instances.shuffle(rng);
508 let extra_count = rng
511 .gen_range(0..=unselected_instances.len())
512 .min(rng.gen_range(0..=unselected_instances.len()));
513 let additional_ancestor_blocks: Vec<_> = unselected_instances[0..extra_count]
514 .iter()
515 .filter_map(|&(authority, instance)| {
516 if selected_authorities[authority.value()] {
517 return None;
518 }
519 let block = latest_blocks.get(&(authority, instance))?;
520 assert!(
521 block.round() < round,
522 "latest_blocks should only contain blocks from previous rounds"
523 );
524 selected_authorities[authority.value()] = true;
525 Some(block.clone())
526 })
527 .collect();
528
529 let mut ancestor_blocks = quorum_parents;
531 ancestor_blocks.extend(additional_ancestor_blocks);
532 if !ancestor_blocks.iter().any(|b| b.author() == own_authority) {
533 ancestor_blocks.push(latest_blocks[&own_instance].clone());
534 }
535 let ancestors: Vec<_> = ancestor_blocks.iter().map(|b| b.reference()).collect();
536
537 let mut newly_connected = Vec::new();
539 let mut queue = VecDeque::from_iter(ancestors.iter().copied());
540 while let Some(block_ref) = queue.pop_front() {
541 if block_ref.round == 0 {
542 continue; }
544 if included_refs
545 .entry(own_instance)
546 .or_default()
547 .contains(&block_ref)
548 {
549 continue; }
551 included_refs
552 .entry(own_instance)
553 .or_default()
554 .insert(block_ref);
555 newly_connected.push(block_ref);
556 if let Some(block) = all_blocks.get(&block_ref) {
558 queue.extend(block.ancestors().iter().cloned());
559 }
560 }
561
562 let votes: Vec<_> = newly_connected
564 .iter()
565 .filter(|_| reject_percentage > 0)
566 .filter_map(|&block_ref| {
567 let rejects: Vec<_> = (0..num_transactions)
568 .filter(|_| rng.gen_range(0..100) < reject_percentage)
569 .map(|idx| idx as TransactionIndex)
570 .collect();
571 (!rejects.is_empty()).then_some(BlockTransactionVotes { block_ref, rejects })
572 })
573 .collect();
574
575 let transactions: Vec<_> = (0..num_transactions)
576 .map(|_| Transaction::new(vec![1_u8; 16]))
577 .collect();
578
579 let timestamp = (round as u64) * 1000 + (own_authority.value() as u64) + rng.gen_range(0..100);
580
581 VerifiedBlock::new_for_test(
582 TestBlock::new(round, own_authority.value() as u32)
583 .set_transactions(transactions)
584 .set_transaction_votes(votes)
585 .set_ancestors(ancestors)
586 .set_timestamp_ms(timestamp)
587 .build(),
588 )
589}
590
591#[derive(Clone, Default)]
593struct RoundState {
594 visited_stake: Stake,
596 unvisited: Vec<usize>,
598}
599
600pub struct RandomDagIterator<'a> {
603 dag: &'a RandomDag,
604 rng: &'a mut StdRng,
605 quorum_threshold: Stake,
606 max_step: Round,
607 quorum_round: Round,
609 completed_round: Round,
611 round_states: Vec<RoundState>,
613 num_remaining: usize,
615}
616
617impl<'a> RandomDagIterator<'a> {
618 fn new(dag: &'a RandomDag, rng: &'a mut StdRng, max_step: Round) -> Self {
619 let num_rounds = dag.num_rounds as usize;
620 let committee = &dag.context.committee;
621 let quorum_threshold = committee.quorum_threshold();
622
623 let mut round_states: Vec<RoundState> = vec![RoundState::default(); num_rounds + 1];
624
625 for (idx, block) in dag.blocks.iter().enumerate() {
626 let round = block.round() as usize;
627 round_states[round].unvisited.push(idx);
628 }
629
630 let num_remaining = dag.blocks.len();
631
632 Self {
633 dag,
634 rng,
635 max_step,
636 quorum_round: 0,
637 completed_round: 0,
638 quorum_threshold,
639 round_states,
640 num_remaining,
641 }
642 }
643}
644
645impl Iterator for RandomDagIterator<'_> {
646 type Item = VerifiedBlock;
647
648 fn next(&mut self) -> Option<Self::Item> {
658 if self.num_remaining == 0 {
659 return None;
660 }
661
662 let min_round = self.completed_round as usize + 1;
664 let max_round =
665 ((self.quorum_round + self.max_step) as usize).min(self.round_states.len() - 1);
666 let eligible_rounds = min_round..=max_round;
667
668 let total_candidates: usize = eligible_rounds
669 .clone()
670 .map(|r| self.round_states[r].unvisited.len())
671 .sum();
672
673 if total_candidates == 0 {
674 return None;
675 }
676
677 let mut selection = self.rng.gen_range(0..total_candidates);
679 let mut selected_round = 0;
680 let mut selected_pos = 0;
681
682 for r in eligible_rounds {
683 let count = self.round_states[r].unvisited.len();
684 if selection < count {
685 selected_round = r;
686 selected_pos = selection;
687 break;
688 }
689 selection -= count;
690 }
691
692 let block_idx = self.round_states[selected_round]
694 .unvisited
695 .swap_remove(selected_pos);
696 let block = self.dag.blocks[block_idx].clone();
697
698 let stake = self.dag.context.committee.stake(block.author());
700 self.round_states[selected_round].visited_stake += stake;
701 self.num_remaining -= 1;
702
703 while self
705 .round_states
706 .get(self.completed_round as usize + 1)
707 .is_some_and(|s| s.unvisited.is_empty())
708 {
709 self.completed_round += 1;
710 }
711
712 while self
714 .round_states
715 .get(self.quorum_round as usize + 1)
716 .is_some_and(|s| s.visited_stake >= self.quorum_threshold)
717 {
718 self.quorum_round += 1;
719 }
720
721 Some(block)
722 }
723}