1use std::collections::{BTreeMap, BTreeSet, VecDeque};
8use std::ops::Bound::Included;
9use std::sync::Arc;
10
11use consensus_config::{AuthorityIndex, Stake};
12use consensus_types::block::{BlockDigest, BlockRef};
13use consensus_types::block::{Round, TransactionIndex};
14use mysten_metrics::monitored_mpsc::unbounded_channel;
15use parking_lot::RwLock;
16use rand::prelude::SliceRandom;
17use rand::{Rng, rngs::StdRng};
18
19use crate::Transaction;
20use crate::block::{BlockTransactionVotes, TestBlock, genesis_blocks};
21use crate::{
22 block::{BlockAPI, VerifiedBlock},
23 block_manager::BlockManager,
24 block_verifier::NoopBlockVerifier,
25 commit::{CommittedSubDag, DecidedLeader},
26 commit_finalizer::CommitFinalizer,
27 context::Context,
28 dag_state::DagState,
29 leader_schedule::{LeaderSchedule, LeaderSwapTable},
30 linearizer::Linearizer,
31 storage::mem_store::MemStore,
32 transaction_certifier::TransactionCertifier,
33 universal_committer::{
34 UniversalCommitter, universal_committer_builder::UniversalCommitterBuilder,
35 },
36};
37
38pub struct CommitTestFixture {
41 pub context: Arc<Context>,
42 pub linearizer: Linearizer,
43 pub transaction_certifier: TransactionCertifier,
44 pub commit_finalizer: CommitFinalizer,
45
46 dag_state: Arc<RwLock<DagState>>,
47 block_manager: BlockManager,
48 committer: UniversalCommitter,
49}
50
51impl CommitTestFixture {
52 pub fn new(context: Arc<Context>) -> Self {
54 let leader_schedule = Arc::new(LeaderSchedule::new(
55 context.clone(),
56 LeaderSwapTable::default(),
57 ));
58 let dag_state = Arc::new(RwLock::new(DagState::new(
59 context.clone(),
60 Arc::new(MemStore::new()),
61 )));
62
63 let committer =
65 UniversalCommitterBuilder::new(context.clone(), leader_schedule, dag_state.clone())
66 .with_pipeline(true)
67 .build();
68
69 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
70
71 let linearizer = Linearizer::new(context.clone(), dag_state.clone());
72 let (blocks_sender, _blocks_receiver) = unbounded_channel("consensus_block_output");
73 let transaction_certifier = TransactionCertifier::new(
74 context.clone(),
75 Arc::new(NoopBlockVerifier {}),
76 dag_state.clone(),
77 blocks_sender,
78 );
79 let (commit_sender, _commit_receiver) = unbounded_channel("consensus_commit_output");
80 let commit_finalizer = CommitFinalizer::new(
81 context.clone(),
82 dag_state.clone(),
83 transaction_certifier.clone(),
84 commit_sender,
85 );
86
87 Self {
88 context,
89 linearizer,
90 transaction_certifier,
91 commit_finalizer,
92 dag_state,
93 block_manager,
94 committer,
95 }
96 }
97
98 pub fn with_options(
100 num_authorities: usize,
101 authority_index: u32,
102 gc_depth: Option<u32>,
103 ) -> Self {
104 Self::new(Self::context_with_options(
105 num_authorities,
106 authority_index,
107 gc_depth,
108 ))
109 }
110
111 pub fn context_with_options(
112 num_authorities: usize,
113 authority_index: u32,
114 gc_depth: Option<u32>,
115 ) -> Arc<Context> {
116 let (mut context, _keys) = Context::new_for_test(num_authorities);
117 if let Some(gc_depth) = gc_depth {
118 context
119 .protocol_config
120 .set_consensus_gc_depth_for_testing(gc_depth);
121 }
122 context.parameters.internal.skip_equivocation_validation = true;
125 Arc::new(context.with_authority_index(AuthorityIndex::new_for_test(authority_index)))
126 }
127
128 pub fn try_accept_blocks(&mut self, blocks: Vec<VerifiedBlock>) {
131 self.transaction_certifier
132 .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
133 self.block_manager.try_accept_blocks(blocks);
134 }
135
136 pub fn add_blocks(&self, blocks: Vec<VerifiedBlock>) {
139 let blocks_and_votes = blocks.iter().map(|b| (b.clone(), vec![])).collect();
140 self.transaction_certifier
141 .add_voted_blocks(blocks_and_votes);
142 self.dag_state.write().accept_blocks(blocks);
143 }
144
145 pub fn add_blocks_with_own_votes(
146 &self,
147 blocks_and_votes: Vec<(VerifiedBlock, Vec<TransactionIndex>)>,
148 ) {
149 let blocks = blocks_and_votes.iter().map(|(b, _)| b.clone()).collect();
150 self.transaction_certifier
151 .add_voted_blocks(blocks_and_votes);
152 self.dag_state.write().accept_blocks(blocks);
153 }
154
155 #[cfg(test)]
157 pub(crate) fn has_no_suspended_blocks(&self) -> bool {
158 self.block_manager.is_empty()
159 }
160
161 pub async fn try_commit(
164 &mut self,
165 last_decided: crate::block::Slot,
166 ) -> (Vec<CommittedSubDag>, crate::block::Slot) {
167 let sequence = self.committer.try_decide(last_decided);
168 let new_last_decided = sequence
169 .last()
170 .map(|leader| leader.slot())
171 .unwrap_or(last_decided);
172 let finalized = self.process_commits(sequence).await;
173 (finalized, new_last_decided)
174 }
175
176 pub(crate) async fn process_commits(
182 &mut self,
183 sequence: Vec<DecidedLeader>,
184 ) -> Vec<CommittedSubDag> {
185 let leaders: Vec<VerifiedBlock> = sequence
187 .into_iter()
188 .filter_map(|d| match d {
189 DecidedLeader::Commit(block, _) => Some(block),
190 DecidedLeader::Skip(_) => None,
191 })
192 .collect();
193
194 if leaders.is_empty() {
195 return vec![];
196 }
197
198 let committed_sub_dags = self.linearizer.handle_commit(leaders);
200
201 self.block_manager
204 .try_unsuspend_blocks_for_latest_gc_round();
205
206 let mut finalized_commits = vec![];
208 for mut subdag in committed_sub_dags {
209 subdag.decided_with_local_blocks = true;
210 let finalized = self.commit_finalizer.process_commit(subdag).await;
211 finalized_commits.extend(finalized);
212 }
213
214 finalized_commits
215 }
216}
217
218pub fn assert_commit_sequences_match(
221 commit_sequences: Vec<Vec<CommittedSubDag>>,
222) -> Vec<CommittedSubDag> {
223 let (shortest_idx, shortest_sequence) = commit_sequences
224 .iter()
225 .enumerate()
226 .min_by_key(|(_, seq)| seq.len())
227 .expect("commit_sequences should not be empty");
228
229 for (run, commit_sequence) in commit_sequences.iter().enumerate() {
230 assert!(
234 commit_sequence.len() <= shortest_sequence.len() + 3,
235 "Commit sequence at run {run} is more than 3 commits longer than shortest (run {shortest_idx}): {} vs {}",
236 commit_sequence.len(),
237 shortest_sequence.len()
238 );
239
240 for (commit_index, (c1, c2)) in commit_sequence
241 .iter()
242 .zip(shortest_sequence.iter())
243 .enumerate()
244 {
245 assert_eq!(
246 c1.leader, c2.leader,
247 "Leader mismatch at run {run} commit {commit_index}"
248 );
249 assert_eq!(
250 c1.commit_ref, c2.commit_ref,
251 "Commit sequence mismatch at run {run} commit {commit_index}"
252 );
253 assert_eq!(
254 c1.rejected_transactions_by_block, c2.rejected_transactions_by_block,
255 "Rejected transactions mismatch at run {run} commit {commit_index}"
256 );
257 }
258 }
259
260 let mut total_transactions = 0;
261 let mut rejected_transactions = 0;
262 let mut reject_votes = 0;
263 let mut blocks = 4;
264 for commit in shortest_sequence.iter() {
265 total_transactions += commit
266 .blocks
267 .iter()
268 .map(|block| block.transactions().len())
269 .sum::<usize>();
270 rejected_transactions += commit
271 .rejected_transactions_by_block
272 .values()
273 .map(|transactions| transactions.len())
274 .sum::<usize>();
275 reject_votes += commit
276 .blocks
277 .iter()
278 .map(|block| block.transaction_votes().len())
279 .sum::<usize>();
280 blocks += commit.blocks.len();
281 }
282
283 tracing::info!(
284 "Finished comparing commit sequences. Commits: {}, Blocks: {}, Total transactions: {}, Rejected transactions: {}, Reject votes: {}",
285 shortest_sequence.len(),
286 blocks,
287 total_transactions,
288 rejected_transactions,
289 reject_votes
290 );
291
292 shortest_sequence.clone()
293}
294
295pub struct RandomDagConfig {
299 pub num_authorities: usize,
301 pub num_rounds: Round,
303 pub num_transactions: u32,
305 pub reject_percentage: u8,
307 pub equivocators: Vec<(AuthorityIndex, u16)>,
310}
311
312pub struct RandomDag {
314 context: Arc<Context>,
315 pub blocks: Vec<VerifiedBlock>,
316 num_rounds: Round,
317}
318
319impl RandomDag {
320 pub fn new(context: Arc<Context>, rng: &mut StdRng, config: RandomDagConfig) -> Self {
322 let RandomDagConfig {
323 num_authorities: _,
324 num_rounds,
325 num_transactions,
326 reject_percentage,
327 equivocators,
328 } = config;
329
330 let committee = &context.committee;
331 let quorum_threshold = committee.quorum_threshold();
332 let total_stake = committee.total_stake();
333
334 let mut instances: Vec<InstanceID> = committee.authorities().map(|(a, _)| (a, 0)).collect();
336 for (authority, num_equivocators) in equivocators {
337 for i in 1..=num_equivocators {
338 instances.push((authority, i));
339 }
340 }
341
342 let genesis_blocks = genesis_blocks(&context)
343 .into_iter()
344 .map(|b| (b.author(), b))
345 .collect::<BTreeMap<AuthorityIndex, VerifiedBlock>>();
346 let last_round_blocks: Vec<VerifiedBlock> = genesis_blocks.values().cloned().collect();
347
348 let mut all_blocks: BTreeMap<BlockRef, VerifiedBlock> = BTreeMap::new();
350 for block in &last_round_blocks {
352 all_blocks.insert(block.reference(), block.clone());
353 }
354
355 let mut latest_blocks = instances
357 .iter()
358 .map(|&(a, i)| {
359 let b = genesis_blocks.get(&a).unwrap();
360 ((a, i), b.clone())
361 })
362 .collect::<BTreeMap<InstanceID, VerifiedBlock>>();
363
364 let mut included_refs = BTreeMap::<InstanceID, BTreeSet<BlockRef>>::new();
366
367 for r in 1..=num_rounds {
368 let target_stake = rng.gen_range(quorum_threshold..=total_stake);
370
371 let mut proposers = instances.clone();
375 proposers.shuffle(rng);
376 let mut selected_stake = 0;
377 let mut selected_authorities = vec![false; committee.size()];
379 let selected_proposers: Vec<_> = proposers
380 .into_iter()
381 .take_while(|instance| {
382 if selected_stake >= target_stake {
383 return false;
384 }
385 if !selected_authorities[instance.0.value()] {
386 selected_authorities[instance.0.value()] = true;
387 selected_stake += committee.stake(instance.0);
388 }
389 true
390 })
391 .collect();
392
393 let mut current_round_blocks = Vec::new();
394 for instance_id in selected_proposers {
395 let block = build_block_for_instance(
396 &context,
397 &instances,
398 rng,
399 r,
400 instance_id,
401 num_transactions,
402 reject_percentage,
403 &all_blocks,
404 &mut latest_blocks,
405 &mut included_refs,
406 );
407 current_round_blocks.push((instance_id, block));
408 }
409
410 for (instance_id, block) in current_round_blocks {
412 all_blocks.insert(block.reference(), block.clone());
413 latest_blocks.insert(instance_id, block);
414 }
415 }
416
417 RandomDag {
418 context,
419 blocks: all_blocks.values().cloned().collect(),
420 num_rounds,
421 }
422 }
423
424 pub fn from_blocks(context: Arc<Context>, blocks: Vec<VerifiedBlock>) -> Self {
426 let num_rounds = blocks.iter().map(|b| b.round()).max().unwrap_or(0);
427 RandomDag {
428 context,
429 blocks,
430 num_rounds,
431 }
432 }
433
434 pub fn random_iter<'a>(
436 &'a self,
437 rng: &'a mut StdRng,
438 max_step: Round,
439 ) -> RandomDagIterator<'a> {
440 RandomDagIterator::new(self, rng, max_step)
441 }
442}
443
444type InstanceID = (AuthorityIndex, u16);
446
447fn build_block_for_instance(
449 context: &Arc<Context>,
450 instances: &[InstanceID],
451 rng: &mut StdRng,
452 round: Round,
453 own_instance: InstanceID,
454 num_transactions: u32,
455 reject_percentage: u8,
456 all_blocks: &BTreeMap<BlockRef, VerifiedBlock>,
457 latest_blocks: &mut BTreeMap<InstanceID, VerifiedBlock>,
458 included_refs: &mut BTreeMap<InstanceID, BTreeSet<BlockRef>>,
459) -> VerifiedBlock {
460 let committee = &context.committee;
461 let quorum_threshold = committee.quorum_threshold();
462 let own_authority = own_instance.0;
463
464 let prev_round = round - 1;
466 let mut prev_round_blocks: Vec<_> = all_blocks
467 .range((
468 Included(BlockRef::new(
469 prev_round,
470 AuthorityIndex::MIN,
471 BlockDigest::MIN,
472 )),
473 Included(BlockRef::new(
474 prev_round,
475 AuthorityIndex::MAX,
476 BlockDigest::MAX,
477 )),
478 ))
479 .map(|(_, b)| b)
480 .cloned()
481 .collect();
482 prev_round_blocks.shuffle(rng);
483 let mut parent_stake: Stake = 0;
484 let mut selected_authorities = vec![false; committee.size()];
486 let quorum_parents: Vec<_> = prev_round_blocks
487 .into_iter()
488 .filter_map(|b| {
489 if parent_stake >= quorum_threshold {
490 return None;
491 }
492 if selected_authorities[b.author().value()] {
493 return None;
494 }
495 selected_authorities[b.author().value()] = true;
496 parent_stake += committee.stake(b.author());
497 Some(b)
498 })
499 .collect();
500
501 let mut unselected_instances: Vec<_> = instances
503 .iter()
504 .filter(|(authority, _)| !selected_authorities[authority.value()])
505 .cloned()
506 .collect();
507
508 unselected_instances.shuffle(rng);
510 let extra_count = rng
513 .gen_range(0..=unselected_instances.len())
514 .min(rng.gen_range(0..=unselected_instances.len()));
515 let additional_ancestor_blocks: Vec<_> = unselected_instances[0..extra_count]
516 .iter()
517 .filter_map(|&(authority, instance)| {
518 if selected_authorities[authority.value()] {
519 return None;
520 }
521 let block = latest_blocks.get(&(authority, instance))?;
522 assert!(
523 block.round() < round,
524 "latest_blocks should only contain blocks from previous rounds"
525 );
526 selected_authorities[authority.value()] = true;
527 Some(block.clone())
528 })
529 .collect();
530
531 let mut ancestor_blocks = quorum_parents;
533 ancestor_blocks.extend(additional_ancestor_blocks);
534 if !ancestor_blocks.iter().any(|b| b.author() == own_authority) {
535 ancestor_blocks.push(latest_blocks[&own_instance].clone());
536 }
537 let ancestors: Vec<_> = ancestor_blocks.iter().map(|b| b.reference()).collect();
538
539 let mut newly_connected = Vec::new();
541 let mut queue = VecDeque::from_iter(ancestors.iter().copied());
542 while let Some(block_ref) = queue.pop_front() {
543 if block_ref.round == 0 {
544 continue; }
546 if included_refs
547 .entry(own_instance)
548 .or_default()
549 .contains(&block_ref)
550 {
551 continue; }
553 included_refs
554 .entry(own_instance)
555 .or_default()
556 .insert(block_ref);
557 newly_connected.push(block_ref);
558 if let Some(block) = all_blocks.get(&block_ref) {
560 queue.extend(block.ancestors().iter().cloned());
561 }
562 }
563
564 let votes: Vec<_> = newly_connected
566 .iter()
567 .filter(|_| reject_percentage > 0)
568 .filter_map(|&block_ref| {
569 let rejects: Vec<_> = (0..num_transactions)
570 .filter(|_| rng.gen_range(0..100) < reject_percentage)
571 .map(|idx| idx as TransactionIndex)
572 .collect();
573 (!rejects.is_empty()).then_some(BlockTransactionVotes { block_ref, rejects })
574 })
575 .collect();
576
577 let transactions: Vec<_> = (0..num_transactions)
578 .map(|_| Transaction::new(vec![1_u8; 16]))
579 .collect();
580
581 let timestamp = (round as u64) * 1000 + (own_authority.value() as u64) + rng.gen_range(0..100);
582
583 VerifiedBlock::new_for_test(
584 TestBlock::new(round, own_authority.value() as u32)
585 .set_transactions(transactions)
586 .set_transaction_votes(votes)
587 .set_ancestors(ancestors)
588 .set_timestamp_ms(timestamp)
589 .build(),
590 )
591}
592
593#[derive(Clone, Default)]
595struct RoundState {
596 visited_stake: Stake,
598 unvisited: Vec<usize>,
600}
601
602pub struct RandomDagIterator<'a> {
605 dag: &'a RandomDag,
606 rng: &'a mut StdRng,
607 quorum_threshold: Stake,
608 max_step: Round,
609 quorum_round: Round,
611 completed_round: Round,
613 round_states: Vec<RoundState>,
615 num_remaining: usize,
617}
618
619impl<'a> RandomDagIterator<'a> {
620 fn new(dag: &'a RandomDag, rng: &'a mut StdRng, max_step: Round) -> Self {
621 let num_rounds = dag.num_rounds as usize;
622 let committee = &dag.context.committee;
623 let quorum_threshold = committee.quorum_threshold();
624
625 let mut round_states: Vec<RoundState> = vec![RoundState::default(); num_rounds + 1];
626
627 for (idx, block) in dag.blocks.iter().enumerate() {
628 let round = block.round() as usize;
629 round_states[round].unvisited.push(idx);
630 }
631
632 let num_remaining = dag.blocks.len();
633
634 Self {
635 dag,
636 rng,
637 max_step,
638 quorum_round: 0,
639 completed_round: 0,
640 quorum_threshold,
641 round_states,
642 num_remaining,
643 }
644 }
645}
646
647impl Iterator for RandomDagIterator<'_> {
648 type Item = VerifiedBlock;
649
650 fn next(&mut self) -> Option<Self::Item> {
660 if self.num_remaining == 0 {
661 return None;
662 }
663
664 let min_round = self.completed_round as usize + 1;
666 let max_round =
667 ((self.quorum_round + self.max_step) as usize).min(self.round_states.len() - 1);
668 let eligible_rounds = min_round..=max_round;
669
670 let total_candidates: usize = eligible_rounds
671 .clone()
672 .map(|r| self.round_states[r].unvisited.len())
673 .sum();
674
675 if total_candidates == 0 {
676 return None;
677 }
678
679 let mut selection = self.rng.gen_range(0..total_candidates);
681 let mut selected_round = 0;
682 let mut selected_pos = 0;
683
684 for r in eligible_rounds {
685 let count = self.round_states[r].unvisited.len();
686 if selection < count {
687 selected_round = r;
688 selected_pos = selection;
689 break;
690 }
691 selection -= count;
692 }
693
694 let block_idx = self.round_states[selected_round]
696 .unvisited
697 .swap_remove(selected_pos);
698 let block = self.dag.blocks[block_idx].clone();
699
700 let stake = self.dag.context.committee.stake(block.author());
702 self.round_states[selected_round].visited_stake += stake;
703 self.num_remaining -= 1;
704
705 while self
707 .round_states
708 .get(self.completed_round as usize + 1)
709 .is_some_and(|s| s.unvisited.is_empty())
710 {
711 self.completed_round += 1;
712 }
713
714 while self
716 .round_states
717 .get(self.quorum_round as usize + 1)
718 .is_some_and(|s| s.visited_stake >= self.quorum_threshold)
719 {
720 self.quorum_round += 1;
721 }
722
723 Some(block)
724 }
725}