1use std::{collections::BTreeMap, sync::Arc, time::Duration};
5
6use consensus_config::Stake;
7use consensus_types::block::{BlockRef, Round, TransactionIndex};
8use mysten_metrics::monitored_mpsc::UnboundedSender;
9use parking_lot::RwLock;
10use tracing::{debug, info};
11
12use crate::{
13 BlockAPI as _, CertifiedBlock, CertifiedBlocksOutput, VerifiedBlock,
14 block::{BlockTransactionVotes, GENESIS_ROUND},
15 block_verifier::BlockVerifier,
16 context::Context,
17 dag_state::DagState,
18 stake_aggregator::{QuorumThreshold, StakeAggregator},
19};
20
21#[derive(Clone)]
48pub struct TransactionCertifier {
49 certifier_state: Arc<RwLock<CertifierState>>,
51 block_verifier: Arc<dyn BlockVerifier>,
53 dag_state: Arc<RwLock<DagState>>,
55 certified_blocks_sender: UnboundedSender<CertifiedBlocksOutput>,
57}
58
59impl TransactionCertifier {
60 pub fn new(
61 context: Arc<Context>,
62 block_verifier: Arc<dyn BlockVerifier>,
63 dag_state: Arc<RwLock<DagState>>,
64 certified_blocks_sender: UnboundedSender<CertifiedBlocksOutput>,
65 ) -> Self {
66 Self {
67 certifier_state: Arc::new(RwLock::new(CertifierState::new(context))),
68 block_verifier,
69 dag_state,
70 certified_blocks_sender,
71 }
72 }
73
74 pub(crate) fn recover_blocks_after_round(&self, after_round: Round) {
79 let context = self.certifier_state.read().context.clone();
80 if !context.protocol_config.mysticeti_fastpath() {
81 info!("Skipping certifier recovery in non-mysticeti fast path mode");
82 return;
83 }
84
85 let store = self.dag_state.read().store().clone();
86
87 let recovery_start_round = after_round + 1;
88 info!(
89 "Recovering certifier state from round {}",
90 recovery_start_round,
91 );
92
93 let authorities = context
94 .committee
95 .authorities()
96 .map(|(index, _)| index)
97 .collect::<Vec<_>>();
98 for authority_index in authorities {
99 let blocks = store
100 .scan_blocks_by_author(authority_index, recovery_start_round)
101 .unwrap();
102 info!(
103 "Recovered and voting on {} blocks from authority {} {}",
104 blocks.len(),
105 authority_index,
106 context.committee.authority(authority_index).hostname
107 );
108 self.recover_and_vote_on_blocks(blocks);
109 }
110 }
111
112 pub(crate) fn recover_and_vote_on_blocks(&self, blocks: Vec<VerifiedBlock>) {
120 let should_vote_blocks = {
121 let dag_state = self.dag_state.read();
122 let gc_round = dag_state.gc_round();
123 blocks
124 .iter()
125 .map(|b| b.round() > gc_round && !dag_state.has_been_included(&b.reference()))
127 .collect::<Vec<_>>()
128 };
129 let voted_blocks = blocks
130 .into_iter()
131 .zip(should_vote_blocks)
132 .map(|(b, should_vote)| {
133 if !should_vote {
134 (b, vec![])
137 } else {
138 let reject_transaction_votes =
141 self.block_verifier.vote(&b).unwrap_or_else(|e| {
142 panic!("Failed to vote on block during recovery: {}", e)
143 });
144 (b, reject_transaction_votes)
145 }
146 })
147 .collect::<Vec<_>>();
148 self.certifier_state.write().add_voted_blocks(voted_blocks);
149 }
153
154 pub fn add_voted_blocks(&self, voted_blocks: Vec<(VerifiedBlock, Vec<TransactionIndex>)>) {
157 let certified_blocks = self.certifier_state.write().add_voted_blocks(voted_blocks);
158 self.send_certified_blocks(certified_blocks);
159 }
160
161 pub(crate) fn add_proposed_block(&self, proposed_block: VerifiedBlock) {
164 let certified_blocks = self
165 .certifier_state
166 .write()
167 .add_proposed_block(proposed_block);
168 self.send_certified_blocks(certified_blocks);
169 }
170
171 fn send_certified_blocks(&self, certified_blocks: Vec<CertifiedBlock>) {
173 if certified_blocks.is_empty() {
174 return;
175 }
176 if let Err(e) = self.certified_blocks_sender.send(CertifiedBlocksOutput {
177 blocks: certified_blocks,
178 }) {
179 tracing::warn!("Failed to send certified blocks: {:?}", e);
180 }
181 }
182
183 pub(crate) fn get_own_votes(&self, block_refs: Vec<BlockRef>) -> Vec<BlockTransactionVotes> {
185 let mut votes = vec![];
186 let certifier_state = self.certifier_state.read();
187 for block_ref in block_refs {
188 if block_ref.round <= certifier_state.gc_round {
189 continue;
190 }
191 let vote_info = certifier_state.votes.get(&block_ref).unwrap_or_else(|| {
192 panic!("Ancestor block {} not found in certifier state", block_ref)
193 });
194 if !vote_info.own_reject_txn_votes.is_empty() {
195 votes.push(BlockTransactionVotes {
196 block_ref,
197 rejects: vote_info.own_reject_txn_votes.clone(),
198 });
199 }
200 }
201 votes
202 }
203
204 pub(crate) fn get_reject_votes(
208 &self,
209 block_ref: &BlockRef,
210 ) -> Option<Vec<(TransactionIndex, Stake)>> {
211 let accumulated_reject_votes = self
212 .certifier_state
213 .read()
214 .votes
215 .get(block_ref)?
216 .reject_txn_votes
217 .iter()
218 .map(|(idx, stake_agg)| (*idx, stake_agg.stake()))
219 .collect::<Vec<_>>();
220 Some(accumulated_reject_votes)
221 }
222
223 pub(crate) fn run_gc(&self, gc_round: Round) {
231 let dag_state_gc_round = self.dag_state.read().gc_round();
232 assert!(
233 gc_round <= dag_state_gc_round,
234 "TransactionCertifier cannot GC higher than DagState GC round ({} > {})",
235 gc_round,
236 dag_state_gc_round
237 );
238 self.certifier_state.write().update_gc_round(gc_round);
239 }
240}
241
242struct CertifierState {
246 context: Arc<Context>,
247
248 votes: BTreeMap<BlockRef, VoteInfo>,
251
252 gc_round: Round,
254}
255
256impl CertifierState {
257 fn new(context: Arc<Context>) -> Self {
258 Self {
259 context,
260 votes: BTreeMap::new(),
261 gc_round: GENESIS_ROUND,
262 }
263 }
264
265 fn add_voted_blocks(
266 &mut self,
267 voted_blocks: Vec<(VerifiedBlock, Vec<TransactionIndex>)>,
268 ) -> Vec<CertifiedBlock> {
269 let mut certified_blocks = vec![];
270 for (voted_block, reject_txn_votes) in voted_blocks {
271 let blocks = self.add_voted_block(voted_block, reject_txn_votes);
272 certified_blocks.extend(blocks);
273 }
274
275 if !certified_blocks.is_empty() {
276 self.context
277 .metrics
278 .node_metrics
279 .certifier_output_blocks
280 .with_label_values(&["voted"])
281 .inc_by(certified_blocks.len() as u64);
282 }
283
284 certified_blocks
285 }
286
287 fn add_voted_block(
288 &mut self,
289 voted_block: VerifiedBlock,
290 reject_txn_votes: Vec<TransactionIndex>,
291 ) -> Vec<CertifiedBlock> {
292 if voted_block.round() <= self.gc_round {
293 return vec![];
295 }
296
297 let peer_hostname = &self
299 .context
300 .committee
301 .authority(voted_block.author())
302 .hostname;
303 self.context
304 .metrics
305 .node_metrics
306 .certifier_own_reject_votes
307 .with_label_values(&[peer_hostname])
308 .inc_by(reject_txn_votes.len() as u64);
309
310 let vote_info = self.votes.entry(voted_block.reference()).or_default();
312 if vote_info.block.is_some() {
313 return vec![];
315 }
316 vote_info.block = Some(voted_block.clone());
317 vote_info.own_reject_txn_votes = reject_txn_votes;
318
319 let mut certified_blocks = vec![];
320
321 let now = self.context.clock.timestamp_utc_ms();
322
323 for block_votes in voted_block.transaction_votes() {
325 if block_votes.block_ref.round <= self.gc_round {
326 continue;
328 }
329 let vote_info = self.votes.entry(block_votes.block_ref).or_default();
330 for reject in &block_votes.rejects {
331 vote_info
332 .reject_txn_votes
333 .entry(*reject)
334 .or_default()
335 .add_unique(voted_block.author(), &self.context.committee);
336 }
337 if let Some(certified_block) = vote_info.take_certified_output(&self.context) {
340 let authority_name = self
341 .context
342 .committee
343 .authority(certified_block.block.author())
344 .hostname
345 .clone();
346 self.context
347 .metrics
348 .node_metrics
349 .certifier_block_latency
350 .with_label_values(&[&authority_name])
351 .observe(
352 Duration::from_millis(
353 now.saturating_sub(certified_block.block.timestamp_ms()),
354 )
355 .as_secs_f64(),
356 );
357 certified_blocks.push(certified_block);
358 }
359 }
360
361 certified_blocks
362 }
363
364 fn add_proposed_block(&mut self, proposed_block: VerifiedBlock) -> Vec<CertifiedBlock> {
365 if proposed_block.round() <= self.gc_round + 2 {
366 return vec![];
371 }
372 debug!(
373 "Adding proposed block {}; gc round: {}",
374 proposed_block.reference(),
375 self.gc_round
376 );
377
378 if !self.votes.contains_key(&proposed_block.reference()) {
379 self.context
380 .metrics
381 .node_metrics
382 .certifier_missing_ancestor_during_certification
383 .with_label_values(&["proposed_block_not_found"])
384 .inc();
385 debug!(
386 "Proposed block {} not found in certifier state. GC round: {}",
387 proposed_block.reference(),
388 self.gc_round,
389 );
390 return vec![];
391 }
392
393 let now = self.context.clock.timestamp_utc_ms();
394
395 let mut certified_blocks = vec![];
399 for voting_ancestor in proposed_block.ancestors() {
400 if voting_ancestor.round + 1 != proposed_block.round() {
402 continue;
403 }
404 let Some(voting_info) = self.votes.get(voting_ancestor) else {
405 self.context
406 .metrics
407 .node_metrics
408 .certifier_missing_ancestor_during_certification
409 .with_label_values(&["voting_info_not_found"])
410 .inc();
411 debug!(
412 "Proposed block {}: voting info not found for ancestor {}",
413 proposed_block.reference(),
414 voting_ancestor
415 );
416 continue;
417 };
418 let Some(voting_block) = voting_info.block.clone() else {
419 self.context
420 .metrics
421 .node_metrics
422 .certifier_missing_ancestor_during_certification
423 .with_label_values(&["voting_block_not_found"])
424 .inc();
425 debug!(
426 "Proposed block {}: voting block not found for ancestor {}",
427 proposed_block.reference(),
428 voting_ancestor
429 );
430 continue;
431 };
432 for target_ancestor in voting_block.ancestors() {
433 if target_ancestor.round + 1 != voting_block.round() {
435 continue;
436 }
437 let Some(target_vote_info) = self.votes.get_mut(target_ancestor) else {
438 self.context
439 .metrics
440 .node_metrics
441 .certifier_missing_ancestor_during_certification
442 .with_label_values(&["target_vote_info_not_found"])
443 .inc();
444 debug!(
445 "Proposed block {}: target voting info not found for ancestor {}",
446 proposed_block.reference(),
447 target_ancestor
448 );
449 continue;
450 };
451 target_vote_info
452 .accept_block_votes
453 .add_unique(voting_block.author(), &self.context.committee);
454 if let Some(certified_block) = target_vote_info.take_certified_output(&self.context)
456 {
457 let authority_name = self
458 .context
459 .committee
460 .authority(certified_block.block.author())
461 .hostname
462 .clone();
463 self.context
464 .metrics
465 .node_metrics
466 .certifier_block_latency
467 .with_label_values(&[&authority_name])
468 .observe(
469 Duration::from_millis(
470 now.saturating_sub(certified_block.block.timestamp_ms()),
471 )
472 .as_secs_f64(),
473 );
474 certified_blocks.push(certified_block);
475 }
476 }
477 }
478
479 if !certified_blocks.is_empty() {
480 self.context
481 .metrics
482 .node_metrics
483 .certifier_output_blocks
484 .with_label_values(&["proposed"])
485 .inc_by(certified_blocks.len() as u64);
486 }
487
488 certified_blocks
489 }
490
491 fn update_gc_round(&mut self, gc_round: Round) {
493 self.gc_round = gc_round;
494 while let Some((block_ref, _)) = self.votes.first_key_value() {
495 if block_ref.round <= self.gc_round {
496 self.votes.pop_first();
497 } else {
498 break;
499 }
500 }
501
502 self.context
503 .metrics
504 .node_metrics
505 .certifier_gc_round
506 .set(self.gc_round as i64);
507 }
508}
509
510struct VoteInfo {
513 block: Option<VerifiedBlock>,
516 own_reject_txn_votes: Vec<TransactionIndex>,
520 accept_block_votes: StakeAggregator<QuorumThreshold>,
522 reject_txn_votes: BTreeMap<TransactionIndex, StakeAggregator<QuorumThreshold>>,
524 is_certified: bool,
526}
527
528impl VoteInfo {
529 fn take_certified_output(&mut self, context: &Context) -> Option<CertifiedBlock> {
532 let committee = &context.committee;
533 if self.is_certified {
534 return None;
536 }
537 let Some(block) = self.block.as_ref() else {
538 return None;
540 };
541
542 let peer_hostname = &committee.authority(block.author()).hostname;
543
544 if !self.accept_block_votes.reached_threshold(committee) {
545 return None;
547 }
548 let mut rejected = vec![];
549 for (idx, reject_txn_votes) in &self.reject_txn_votes {
550 if reject_txn_votes.reached_threshold(committee) {
552 context
553 .metrics
554 .node_metrics
555 .certifier_rejected_transactions
556 .with_label_values(&[peer_hostname])
557 .inc();
558 rejected.push(*idx);
559 continue;
560 }
561 if self
576 .accept_block_votes
577 .stake()
578 .saturating_sub(reject_txn_votes.stake())
579 < committee.quorum_threshold()
580 {
581 return None;
582 }
583 }
584 let accepted_txn_count = block.transactions().len().saturating_sub(rejected.len());
586 tracing::trace!(
587 "Certified block {} accepted tx count: {accepted_txn_count} & rejected txn count: {}",
588 block.reference(),
589 rejected.len()
590 );
591 context
592 .metrics
593 .node_metrics
594 .certifier_accepted_transactions
595 .with_label_values(&[peer_hostname])
596 .inc_by(accepted_txn_count as u64);
597 self.is_certified = true;
598 Some(CertifiedBlock {
599 block: block.clone(),
600 rejected,
601 })
602 }
603}
604
605impl Default for VoteInfo {
606 fn default() -> Self {
607 Self {
608 block: None,
609 own_reject_txn_votes: vec![],
610 accept_block_votes: StakeAggregator::new(),
611 reject_txn_votes: BTreeMap::new(),
612 is_certified: false,
613 }
614 }
615}
616
617#[cfg(test)]
618mod test {
619 use consensus_config::AuthorityIndex;
620 use itertools::Itertools;
621 use rand::seq::SliceRandom as _;
622
623 use crate::{
624 TestBlock, Transaction, block::BlockTransactionVotes, context::Context,
625 test_dag_builder::DagBuilder,
626 };
627
628 use super::*;
629
630 #[tokio::test]
631 async fn test_vote_info_basic() {
632 telemetry_subscribers::init_for_testing();
633 let (context, _) = Context::new_for_test(7);
634 let committee = &context.committee;
635
636 {
638 let mut vote_info = VoteInfo::default();
639 let block = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
640 vote_info.block = Some(block.clone());
641
642 assert!(vote_info.take_certified_output(&context).is_none());
643 }
644
645 {
647 let mut vote_info = VoteInfo::default();
648 let block = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
649 vote_info.block = Some(block.clone());
650 for i in 0..4 {
651 vote_info
652 .accept_block_votes
653 .add_unique(AuthorityIndex::new_for_test(i), committee);
654 }
655
656 assert!(vote_info.take_certified_output(&context).is_none());
657 }
658
659 {
661 let mut vote_info = VoteInfo::default();
662 for i in 0..5 {
663 vote_info
664 .accept_block_votes
665 .add_unique(AuthorityIndex::new_for_test(i), committee);
666 }
667
668 assert!(vote_info.take_certified_output(&context).is_none());
669 }
670
671 {
673 let mut vote_info = VoteInfo::default();
674 let block = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
675 vote_info.block = Some(block.clone());
676 for i in 0..4 {
677 vote_info
678 .accept_block_votes
679 .add_unique(AuthorityIndex::new_for_test(i), committee);
680 }
681
682 assert!(vote_info.take_certified_output(&context).is_none());
684
685 vote_info
687 .accept_block_votes
688 .add_unique(AuthorityIndex::new_for_test(4), committee);
689
690 let certified_block = vote_info.take_certified_output(&context).unwrap();
692 assert_eq!(certified_block.block.reference(), block.reference());
693
694 assert!(vote_info.take_certified_output(&context).is_none());
696 }
697
698 {
700 let mut vote_info = VoteInfo::default();
701 let block = VerifiedBlock::new_for_test(
703 TestBlock::new(1, 1)
704 .set_transactions(vec![Transaction::new(vec![4; 8]); 7])
705 .build(),
706 );
707 vote_info.block = Some(block.clone());
708 for i in 0..5 {
710 vote_info
711 .accept_block_votes
712 .add_unique(AuthorityIndex::new_for_test(i), committee);
713 }
714 for reject_tx_idx in 3..8 {
716 vote_info
717 .reject_txn_votes
718 .insert(reject_tx_idx, StakeAggregator::new());
719 for authority_idx in 0..5 {
721 vote_info
722 .reject_txn_votes
723 .get_mut(&reject_tx_idx)
724 .unwrap()
725 .add_unique(AuthorityIndex::new_for_test(authority_idx), committee);
726 }
727 }
728
729 let certified_block = vote_info.take_certified_output(&context).unwrap();
731 assert_eq!(certified_block.block.reference(), block.reference());
732
733 assert!(vote_info.take_certified_output(&context).is_none());
735 }
736
737 {
739 let mut vote_info = VoteInfo::default();
740 let block = VerifiedBlock::new_for_test(
742 TestBlock::new(1, 1)
743 .set_transactions(vec![Transaction::new(vec![4; 8]); 6])
744 .build(),
745 );
746 vote_info.block = Some(block.clone());
747 for i in 0..5 {
749 vote_info
750 .accept_block_votes
751 .add_unique(AuthorityIndex::new_for_test(i), committee);
752 }
753 for reject_tx_idx in 3..6 {
755 vote_info
756 .reject_txn_votes
757 .insert(reject_tx_idx, StakeAggregator::new());
758 for authority_idx in 0..5 {
760 vote_info
761 .reject_txn_votes
762 .get_mut(&reject_tx_idx)
763 .unwrap()
764 .add_unique(AuthorityIndex::new_for_test(authority_idx), committee);
765 }
766 }
767 vote_info.reject_txn_votes.insert(5, StakeAggregator::new());
769 for authority_idx in 0..4 {
770 vote_info
771 .reject_txn_votes
772 .get_mut(&5)
773 .unwrap()
774 .add_unique(AuthorityIndex::new_for_test(authority_idx), committee);
775 }
776
777 assert!(vote_info.take_certified_output(&context).is_none());
779
780 vote_info
782 .reject_txn_votes
783 .get_mut(&5)
784 .unwrap()
785 .add_unique(AuthorityIndex::new_for_test(4), committee);
786
787 let certified_block = vote_info.take_certified_output(&context).unwrap();
789 assert_eq!(certified_block.block.reference(), block.reference());
790
791 assert!(vote_info.take_certified_output(&context).is_none());
793 }
794 }
795
796 #[tokio::test]
797 async fn test_certify_basic() {
798 telemetry_subscribers::init_for_testing();
799 let (context, _) = Context::new_for_test(4);
800 let context = Arc::new(context);
801
802 let mut dag_builder = DagBuilder::new(context.clone());
804 dag_builder.layer(1).num_transactions(4).build();
805 let round_1_blocks = dag_builder.all_blocks();
806 let mut all_blocks = round_1_blocks.clone();
807
808 let mut certifier = CertifierState::new(context.clone());
810 let certified_blocks = certifier
811 .add_voted_blocks(round_1_blocks.iter().map(|b| (b.clone(), vec![])).collect());
812 assert!(certified_blocks.is_empty());
813
814 let transactions = (0..4)
818 .map(|_| Transaction::new(vec![0_u8; 16]))
819 .collect::<Vec<_>>();
820 let ancestors = round_1_blocks
821 .iter()
822 .filter_map(|b| {
823 if b.author().value() < 3 {
824 Some(b.reference())
825 } else {
826 None
827 }
828 })
829 .collect::<Vec<_>>();
830 for author in 0..3 {
831 let mut block = TestBlock::new(2, author)
832 .set_ancestors(ancestors.clone())
833 .set_transactions(transactions.clone());
834 let mut votes = vec![];
835 for i in 0..(3 - author) {
836 let j = author + i;
837 if j == 0 {
838 continue;
840 }
841 votes.push(BlockTransactionVotes {
842 block_ref: round_1_blocks[j as usize].reference(),
843 rejects: vec![j as u16],
844 });
845 }
846 block = block.set_transaction_votes(votes);
847 all_blocks.push(VerifiedBlock::new_for_test(block.build()));
848 }
849
850 let mut certifier = CertifierState::new(context.clone());
852 let certified_blocks =
853 certifier.add_voted_blocks(all_blocks.iter().map(|b| (b.clone(), vec![])).collect());
854 assert!(certified_blocks.is_empty());
855
856 let ancestors = all_blocks
858 .iter()
859 .filter_map(|b| {
860 if b.round() == 1 && b.author().value() == 3 {
861 Some(b.reference())
862 } else if b.round() == 2 {
863 assert_ne!(b.author().value(), 3);
864 Some(b.reference())
865 } else {
866 None
867 }
868 })
869 .collect::<Vec<_>>();
870 assert_eq!(ancestors.len(), 4, "Ancestors {:?}", ancestors);
871 let mut round_3_blocks = vec![];
872 for author in 0..4 {
873 let block = TestBlock::new(3, author)
874 .set_ancestors(ancestors.clone())
875 .set_transactions(transactions.clone());
876 round_3_blocks.push(VerifiedBlock::new_for_test(block.build()));
877 }
878
879 let mut certifier = CertifierState::new(context.clone());
881 certifier.add_voted_blocks(all_blocks.iter().map(|b| (b.clone(), vec![])).collect());
882 let proposed_block = round_3_blocks.pop().unwrap();
883 let mut certified_blocks =
884 certifier.add_voted_blocks(vec![(proposed_block.clone(), vec![])]);
885 certified_blocks.extend(certifier.add_proposed_block(proposed_block));
886 assert_eq!(
887 certified_blocks.len(),
888 2,
889 "Certified blocks {:?}",
890 certified_blocks
891 .iter()
892 .map(|b| b.block.reference().to_string())
893 .join(",")
894 );
895 assert_eq!(
896 certified_blocks[0].block.reference(),
897 round_1_blocks[0].reference()
898 );
899 assert!(certified_blocks[0].rejected.is_empty());
900 assert_eq!(
901 certified_blocks[1].block.reference(),
902 round_1_blocks[2].reference()
903 );
904 assert_eq!(certified_blocks[1].rejected, vec![2]);
905 }
906
907 #[tokio::test]
909 async fn test_certify_randomized() {
910 telemetry_subscribers::init_for_testing();
911 let num_authorities: u32 = 7;
912 let (context, _) = Context::new_for_test(num_authorities as usize);
913 let context = Arc::new(context);
914
915 let num_rounds = 50;
917 let mut dag_builder = DagBuilder::new(context.clone());
918 dag_builder
919 .layers(1..=num_rounds)
920 .min_ancestor_links(false, None)
921 .build();
922 let all_blocks = dag_builder.all_blocks();
923
924 let mut certifier = CertifierState::new(context.clone());
926 let mut expected_certified_blocks =
927 certifier.add_voted_blocks(all_blocks.iter().map(|b| (b.clone(), vec![])).collect());
928 expected_certified_blocks.sort_by_key(|b| b.block.reference());
929
930 for _ in 0..100 {
932 let mut all_blocks = all_blocks.clone();
934 all_blocks.shuffle(&mut rand::thread_rng());
935 let mut certifier = CertifierState::new(context.clone());
936
937 let mut actual_certified_blocks = certifier
939 .add_voted_blocks(all_blocks.iter().map(|b| (b.clone(), vec![])).collect());
940 actual_certified_blocks.sort_by_key(|b| b.block.reference());
941
942 assert_eq!(
944 actual_certified_blocks.len(),
945 expected_certified_blocks.len()
946 );
947 for (actual, expected) in actual_certified_blocks
948 .iter()
949 .zip(expected_certified_blocks.iter())
950 {
951 assert_eq!(actual.block.reference(), expected.block.reference());
952 assert_eq!(actual.rejected, expected.rejected);
953 }
954 }
955 }
956}