1use std::{collections::BTreeMap, sync::Arc, time::Duration};
5
6use consensus_config::Stake;
7use consensus_types::block::{BlockRef, Round, TransactionIndex};
8use mysten_metrics::monitored_mpsc::UnboundedSender;
9use parking_lot::RwLock;
10use tracing::{debug, info};
11
12use crate::{
13 BlockAPI as _, CertifiedBlock, CertifiedBlocksOutput, VerifiedBlock,
14 block::{BlockTransactionVotes, GENESIS_ROUND},
15 block_verifier::BlockVerifier,
16 context::Context,
17 dag_state::DagState,
18 stake_aggregator::{QuorumThreshold, StakeAggregator},
19};
20
21#[derive(Clone)]
48pub struct TransactionCertifier {
49 certifier_state: Arc<RwLock<CertifierState>>,
51 block_verifier: Arc<dyn BlockVerifier>,
53 dag_state: Arc<RwLock<DagState>>,
55 certified_blocks_sender: UnboundedSender<CertifiedBlocksOutput>,
57}
58
59impl TransactionCertifier {
60 pub fn new(
61 context: Arc<Context>,
62 block_verifier: Arc<dyn BlockVerifier>,
63 dag_state: Arc<RwLock<DagState>>,
64 certified_blocks_sender: UnboundedSender<CertifiedBlocksOutput>,
65 ) -> Self {
66 Self {
67 certifier_state: Arc::new(RwLock::new(CertifierState::new(context))),
68 block_verifier,
69 dag_state,
70 certified_blocks_sender,
71 }
72 }
73
74 pub(crate) fn recover_blocks_after_round(&self, after_round: Round) {
79 let context = self.certifier_state.read().context.clone();
80 if !context.protocol_config.mysticeti_fastpath() {
81 info!("Skipping certifier recovery in non-mysticeti fast path mode");
82 return;
83 }
84
85 let store = self.dag_state.read().store().clone();
86
87 let recovery_start_round = after_round + 1;
88 info!(
89 "Recovering certifier state from round {}",
90 recovery_start_round,
91 );
92
93 let authorities = context
94 .committee
95 .authorities()
96 .map(|(index, _)| index)
97 .collect::<Vec<_>>();
98 for authority_index in authorities {
99 let blocks = store
100 .scan_blocks_by_author(authority_index, recovery_start_round)
101 .unwrap();
102 info!(
103 "Recovering and voting on {} blocks for authority {} {}",
104 blocks.len(),
105 authority_index,
106 context.committee.authority(authority_index).hostname
107 );
108 self.recover_and_vote_on_blocks(blocks);
109 }
110 }
111
112 pub(crate) fn recover_and_vote_on_blocks(&self, blocks: Vec<VerifiedBlock>) {
119 let dag_state = self.dag_state.read();
120 let voted_blocks = blocks
121 .into_iter()
122 .map(|b| {
123 if b.round() <= dag_state.gc_round() || dag_state.is_hard_linked(&b.reference()) {
124 (b, vec![])
127 } else {
128 let reject_transaction_votes =
131 self.block_verifier.vote(&b).unwrap_or_else(|e| {
132 panic!("Failed to vote on block during recovery: {}", e)
133 });
134 (b, reject_transaction_votes)
135 }
136 })
137 .collect::<Vec<_>>();
138 self.certifier_state.write().add_voted_blocks(voted_blocks);
139 }
143
144 pub fn add_voted_blocks(&self, voted_blocks: Vec<(VerifiedBlock, Vec<TransactionIndex>)>) {
147 let certified_blocks = self.certifier_state.write().add_voted_blocks(voted_blocks);
148 self.send_certified_blocks(certified_blocks);
149 }
150
151 pub(crate) fn add_proposed_block(&self, proposed_block: VerifiedBlock) {
154 let certified_blocks = self
155 .certifier_state
156 .write()
157 .add_proposed_block(proposed_block);
158 self.send_certified_blocks(certified_blocks);
159 }
160
161 fn send_certified_blocks(&self, certified_blocks: Vec<CertifiedBlock>) {
163 if certified_blocks.is_empty() {
164 return;
165 }
166 if let Err(e) = self.certified_blocks_sender.send(CertifiedBlocksOutput {
167 blocks: certified_blocks,
168 }) {
169 tracing::warn!("Failed to send certified blocks: {:?}", e);
170 }
171 }
172
173 pub(crate) fn get_own_votes(&self, block_refs: Vec<BlockRef>) -> Vec<BlockTransactionVotes> {
175 let mut votes = vec![];
176 let certifier_state = self.certifier_state.read();
177 for block_ref in block_refs {
178 if block_ref.round <= certifier_state.gc_round {
179 continue;
180 }
181 let vote_info = certifier_state.votes.get(&block_ref).unwrap_or_else(|| {
182 panic!("Ancestor block {} not found in certifier state", block_ref)
183 });
184 if !vote_info.own_reject_txn_votes.is_empty() {
185 votes.push(BlockTransactionVotes {
186 block_ref,
187 rejects: vote_info.own_reject_txn_votes.clone(),
188 });
189 }
190 }
191 votes
192 }
193
194 pub(crate) fn get_reject_votes(
198 &self,
199 block_ref: &BlockRef,
200 ) -> Option<Vec<(TransactionIndex, Stake)>> {
201 let accumulated_reject_votes = self
202 .certifier_state
203 .read()
204 .votes
205 .get(block_ref)?
206 .reject_txn_votes
207 .iter()
208 .map(|(idx, stake_agg)| (*idx, stake_agg.stake()))
209 .collect::<Vec<_>>();
210 Some(accumulated_reject_votes)
211 }
212
213 pub(crate) fn run_gc(&self, gc_round: Round) {
221 let dag_state_gc_round = self.dag_state.read().gc_round();
222 assert!(
223 gc_round <= dag_state_gc_round,
224 "TransactionCertifier cannot GC higher than DagState GC round ({} > {})",
225 gc_round,
226 dag_state_gc_round
227 );
228 self.certifier_state.write().update_gc_round(gc_round);
229 }
230}
231
232struct CertifierState {
236 context: Arc<Context>,
237
238 votes: BTreeMap<BlockRef, VoteInfo>,
241
242 gc_round: Round,
244}
245
246impl CertifierState {
247 fn new(context: Arc<Context>) -> Self {
248 Self {
249 context,
250 votes: BTreeMap::new(),
251 gc_round: GENESIS_ROUND,
252 }
253 }
254
255 fn add_voted_blocks(
256 &mut self,
257 voted_blocks: Vec<(VerifiedBlock, Vec<TransactionIndex>)>,
258 ) -> Vec<CertifiedBlock> {
259 let mut certified_blocks = vec![];
260 for (voted_block, reject_txn_votes) in voted_blocks {
261 let blocks = self.add_voted_block(voted_block, reject_txn_votes);
262 certified_blocks.extend(blocks);
263 }
264
265 if !certified_blocks.is_empty() {
266 self.context
267 .metrics
268 .node_metrics
269 .certifier_output_blocks
270 .with_label_values(&["voted"])
271 .inc_by(certified_blocks.len() as u64);
272 }
273
274 certified_blocks
275 }
276
277 fn add_voted_block(
278 &mut self,
279 voted_block: VerifiedBlock,
280 reject_txn_votes: Vec<TransactionIndex>,
281 ) -> Vec<CertifiedBlock> {
282 if voted_block.round() <= self.gc_round {
283 return vec![];
285 }
286
287 let peer_hostname = &self
289 .context
290 .committee
291 .authority(voted_block.author())
292 .hostname;
293 self.context
294 .metrics
295 .node_metrics
296 .certifier_own_reject_votes
297 .with_label_values(&[peer_hostname])
298 .inc_by(reject_txn_votes.len() as u64);
299
300 let vote_info = self.votes.entry(voted_block.reference()).or_default();
302 if vote_info.block.is_some() {
303 return vec![];
305 }
306 vote_info.block = Some(voted_block.clone());
307 vote_info.own_reject_txn_votes = reject_txn_votes;
308
309 let mut certified_blocks = vec![];
310
311 let now = self.context.clock.timestamp_utc_ms();
312
313 for block_votes in voted_block.transaction_votes() {
315 if block_votes.block_ref.round <= self.gc_round {
316 continue;
318 }
319 let vote_info = self.votes.entry(block_votes.block_ref).or_default();
320 for reject in &block_votes.rejects {
321 vote_info
322 .reject_txn_votes
323 .entry(*reject)
324 .or_default()
325 .add_unique(voted_block.author(), &self.context.committee);
326 }
327 if let Some(certified_block) = vote_info.take_certified_output(&self.context) {
330 let authority_name = self
331 .context
332 .committee
333 .authority(certified_block.block.author())
334 .hostname
335 .clone();
336 self.context
337 .metrics
338 .node_metrics
339 .certifier_block_latency
340 .with_label_values(&[&authority_name])
341 .observe(
342 Duration::from_millis(
343 now.saturating_sub(certified_block.block.timestamp_ms()),
344 )
345 .as_secs_f64(),
346 );
347 certified_blocks.push(certified_block);
348 }
349 }
350
351 certified_blocks
352 }
353
354 fn add_proposed_block(&mut self, proposed_block: VerifiedBlock) -> Vec<CertifiedBlock> {
355 if proposed_block.round() <= self.gc_round + 2 {
356 return vec![];
361 }
362 debug!(
363 "Adding proposed block {}; gc round: {}",
364 proposed_block.reference(),
365 self.gc_round
366 );
367
368 if !self.votes.contains_key(&proposed_block.reference()) {
369 self.context
370 .metrics
371 .node_metrics
372 .certifier_missing_ancestor_during_certification
373 .with_label_values(&["proposed_block_not_found"])
374 .inc();
375 debug!(
376 "Proposed block {} not found in certifier state. GC round: {}",
377 proposed_block.reference(),
378 self.gc_round,
379 );
380 return vec![];
381 }
382
383 let now = self.context.clock.timestamp_utc_ms();
384
385 let mut certified_blocks = vec![];
389 for voting_ancestor in proposed_block.ancestors() {
390 if voting_ancestor.round + 1 != proposed_block.round() {
392 continue;
393 }
394 let Some(voting_info) = self.votes.get(voting_ancestor) else {
395 self.context
396 .metrics
397 .node_metrics
398 .certifier_missing_ancestor_during_certification
399 .with_label_values(&["voting_info_not_found"])
400 .inc();
401 debug!(
402 "Proposed block {}: voting info not found for ancestor {}",
403 proposed_block.reference(),
404 voting_ancestor
405 );
406 continue;
407 };
408 let Some(voting_block) = voting_info.block.clone() else {
409 self.context
410 .metrics
411 .node_metrics
412 .certifier_missing_ancestor_during_certification
413 .with_label_values(&["voting_block_not_found"])
414 .inc();
415 debug!(
416 "Proposed block {}: voting block not found for ancestor {}",
417 proposed_block.reference(),
418 voting_ancestor
419 );
420 continue;
421 };
422 for target_ancestor in voting_block.ancestors() {
423 if target_ancestor.round + 1 != voting_block.round() {
425 continue;
426 }
427 let Some(target_vote_info) = self.votes.get_mut(target_ancestor) else {
428 self.context
429 .metrics
430 .node_metrics
431 .certifier_missing_ancestor_during_certification
432 .with_label_values(&["target_vote_info_not_found"])
433 .inc();
434 debug!(
435 "Proposed block {}: target voting info not found for ancestor {}",
436 proposed_block.reference(),
437 target_ancestor
438 );
439 continue;
440 };
441 target_vote_info
442 .accept_block_votes
443 .add_unique(voting_block.author(), &self.context.committee);
444 if let Some(certified_block) = target_vote_info.take_certified_output(&self.context)
446 {
447 let authority_name = self
448 .context
449 .committee
450 .authority(certified_block.block.author())
451 .hostname
452 .clone();
453 self.context
454 .metrics
455 .node_metrics
456 .certifier_block_latency
457 .with_label_values(&[&authority_name])
458 .observe(
459 Duration::from_millis(
460 now.saturating_sub(certified_block.block.timestamp_ms()),
461 )
462 .as_secs_f64(),
463 );
464 certified_blocks.push(certified_block);
465 }
466 }
467 }
468
469 if !certified_blocks.is_empty() {
470 self.context
471 .metrics
472 .node_metrics
473 .certifier_output_blocks
474 .with_label_values(&["proposed"])
475 .inc_by(certified_blocks.len() as u64);
476 }
477
478 certified_blocks
479 }
480
481 fn update_gc_round(&mut self, gc_round: Round) {
483 self.gc_round = gc_round;
484 while let Some((block_ref, _)) = self.votes.first_key_value() {
485 if block_ref.round <= self.gc_round {
486 self.votes.pop_first();
487 } else {
488 break;
489 }
490 }
491
492 self.context
493 .metrics
494 .node_metrics
495 .certifier_gc_round
496 .set(self.gc_round as i64);
497 }
498}
499
500struct VoteInfo {
503 block: Option<VerifiedBlock>,
506 own_reject_txn_votes: Vec<TransactionIndex>,
510 accept_block_votes: StakeAggregator<QuorumThreshold>,
512 reject_txn_votes: BTreeMap<TransactionIndex, StakeAggregator<QuorumThreshold>>,
514 is_certified: bool,
516}
517
518impl VoteInfo {
519 fn take_certified_output(&mut self, context: &Context) -> Option<CertifiedBlock> {
522 let committee = &context.committee;
523 if self.is_certified {
524 return None;
526 }
527 let Some(block) = self.block.as_ref() else {
528 return None;
530 };
531
532 let peer_hostname = &committee.authority(block.author()).hostname;
533
534 if !self.accept_block_votes.reached_threshold(committee) {
535 return None;
537 }
538 let mut rejected = vec![];
539 for (idx, reject_txn_votes) in &self.reject_txn_votes {
540 if reject_txn_votes.reached_threshold(committee) {
542 context
543 .metrics
544 .node_metrics
545 .certifier_rejected_transactions
546 .with_label_values(&[peer_hostname])
547 .inc();
548 rejected.push(*idx);
549 continue;
550 }
551 if self
566 .accept_block_votes
567 .stake()
568 .saturating_sub(reject_txn_votes.stake())
569 < committee.quorum_threshold()
570 {
571 return None;
572 }
573 }
574 let accepted_txn_count = block.transactions().len().saturating_sub(rejected.len());
576 tracing::trace!(
577 "Certified block {} accepted tx count: {accepted_txn_count} & rejected txn count: {}",
578 block.reference(),
579 rejected.len()
580 );
581 context
582 .metrics
583 .node_metrics
584 .certifier_accepted_transactions
585 .with_label_values(&[peer_hostname])
586 .inc_by(accepted_txn_count as u64);
587 self.is_certified = true;
588 Some(CertifiedBlock {
589 block: block.clone(),
590 rejected,
591 })
592 }
593}
594
595impl Default for VoteInfo {
596 fn default() -> Self {
597 Self {
598 block: None,
599 own_reject_txn_votes: vec![],
600 accept_block_votes: StakeAggregator::new(),
601 reject_txn_votes: BTreeMap::new(),
602 is_certified: false,
603 }
604 }
605}
606
607#[cfg(test)]
608mod test {
609 use consensus_config::AuthorityIndex;
610 use itertools::Itertools;
611 use rand::seq::SliceRandom as _;
612
613 use crate::{
614 TestBlock, Transaction, block::BlockTransactionVotes, context::Context,
615 test_dag_builder::DagBuilder,
616 };
617
618 use super::*;
619
620 #[tokio::test]
621 async fn test_vote_info_basic() {
622 telemetry_subscribers::init_for_testing();
623 let (context, _) = Context::new_for_test(7);
624 let committee = &context.committee;
625
626 {
628 let mut vote_info = VoteInfo::default();
629 let block = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
630 vote_info.block = Some(block.clone());
631
632 assert!(vote_info.take_certified_output(&context).is_none());
633 }
634
635 {
637 let mut vote_info = VoteInfo::default();
638 let block = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
639 vote_info.block = Some(block.clone());
640 for i in 0..4 {
641 vote_info
642 .accept_block_votes
643 .add_unique(AuthorityIndex::new_for_test(i), committee);
644 }
645
646 assert!(vote_info.take_certified_output(&context).is_none());
647 }
648
649 {
651 let mut vote_info = VoteInfo::default();
652 for i in 0..5 {
653 vote_info
654 .accept_block_votes
655 .add_unique(AuthorityIndex::new_for_test(i), committee);
656 }
657
658 assert!(vote_info.take_certified_output(&context).is_none());
659 }
660
661 {
663 let mut vote_info = VoteInfo::default();
664 let block = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
665 vote_info.block = Some(block.clone());
666 for i in 0..4 {
667 vote_info
668 .accept_block_votes
669 .add_unique(AuthorityIndex::new_for_test(i), committee);
670 }
671
672 assert!(vote_info.take_certified_output(&context).is_none());
674
675 vote_info
677 .accept_block_votes
678 .add_unique(AuthorityIndex::new_for_test(4), committee);
679
680 let certified_block = vote_info.take_certified_output(&context).unwrap();
682 assert_eq!(certified_block.block.reference(), block.reference());
683
684 assert!(vote_info.take_certified_output(&context).is_none());
686 }
687
688 {
690 let mut vote_info = VoteInfo::default();
691 let block = VerifiedBlock::new_for_test(
693 TestBlock::new(1, 1)
694 .set_transactions(vec![Transaction::new(vec![4; 8]); 7])
695 .build(),
696 );
697 vote_info.block = Some(block.clone());
698 for i in 0..5 {
700 vote_info
701 .accept_block_votes
702 .add_unique(AuthorityIndex::new_for_test(i), committee);
703 }
704 for reject_tx_idx in 3..8 {
706 vote_info
707 .reject_txn_votes
708 .insert(reject_tx_idx, StakeAggregator::new());
709 for authority_idx in 0..5 {
711 vote_info
712 .reject_txn_votes
713 .get_mut(&reject_tx_idx)
714 .unwrap()
715 .add_unique(AuthorityIndex::new_for_test(authority_idx), committee);
716 }
717 }
718
719 let certified_block = vote_info.take_certified_output(&context).unwrap();
721 assert_eq!(certified_block.block.reference(), block.reference());
722
723 assert!(vote_info.take_certified_output(&context).is_none());
725 }
726
727 {
729 let mut vote_info = VoteInfo::default();
730 let block = VerifiedBlock::new_for_test(
732 TestBlock::new(1, 1)
733 .set_transactions(vec![Transaction::new(vec![4; 8]); 6])
734 .build(),
735 );
736 vote_info.block = Some(block.clone());
737 for i in 0..5 {
739 vote_info
740 .accept_block_votes
741 .add_unique(AuthorityIndex::new_for_test(i), committee);
742 }
743 for reject_tx_idx in 3..6 {
745 vote_info
746 .reject_txn_votes
747 .insert(reject_tx_idx, StakeAggregator::new());
748 for authority_idx in 0..5 {
750 vote_info
751 .reject_txn_votes
752 .get_mut(&reject_tx_idx)
753 .unwrap()
754 .add_unique(AuthorityIndex::new_for_test(authority_idx), committee);
755 }
756 }
757 vote_info.reject_txn_votes.insert(5, StakeAggregator::new());
759 for authority_idx in 0..4 {
760 vote_info
761 .reject_txn_votes
762 .get_mut(&5)
763 .unwrap()
764 .add_unique(AuthorityIndex::new_for_test(authority_idx), committee);
765 }
766
767 assert!(vote_info.take_certified_output(&context).is_none());
769
770 vote_info
772 .reject_txn_votes
773 .get_mut(&5)
774 .unwrap()
775 .add_unique(AuthorityIndex::new_for_test(4), committee);
776
777 let certified_block = vote_info.take_certified_output(&context).unwrap();
779 assert_eq!(certified_block.block.reference(), block.reference());
780
781 assert!(vote_info.take_certified_output(&context).is_none());
783 }
784 }
785
786 #[tokio::test]
787 async fn test_certify_basic() {
788 telemetry_subscribers::init_for_testing();
789 let (context, _) = Context::new_for_test(4);
790 let context = Arc::new(context);
791
792 let mut dag_builder = DagBuilder::new(context.clone());
794 dag_builder.layer(1).num_transactions(4).build();
795 let round_1_blocks = dag_builder.all_blocks();
796 let mut all_blocks = round_1_blocks.clone();
797
798 let mut certifier = CertifierState::new(context.clone());
800 let certified_blocks = certifier
801 .add_voted_blocks(round_1_blocks.iter().map(|b| (b.clone(), vec![])).collect());
802 assert!(certified_blocks.is_empty());
803
804 let transactions = (0..4)
808 .map(|_| Transaction::new(vec![0_u8; 16]))
809 .collect::<Vec<_>>();
810 let ancestors = round_1_blocks
811 .iter()
812 .filter_map(|b| {
813 if b.author().value() < 3 {
814 Some(b.reference())
815 } else {
816 None
817 }
818 })
819 .collect::<Vec<_>>();
820 for author in 0..3 {
821 let mut block = TestBlock::new(2, author)
822 .set_ancestors(ancestors.clone())
823 .set_transactions(transactions.clone());
824 let mut votes = vec![];
825 for i in 0..(3 - author) {
826 let j = author + i;
827 if j == 0 {
828 continue;
830 }
831 votes.push(BlockTransactionVotes {
832 block_ref: round_1_blocks[j as usize].reference(),
833 rejects: vec![j as u16],
834 });
835 }
836 block = block.set_transaction_votes(votes);
837 all_blocks.push(VerifiedBlock::new_for_test(block.build()));
838 }
839
840 let mut certifier = CertifierState::new(context.clone());
842 let certified_blocks =
843 certifier.add_voted_blocks(all_blocks.iter().map(|b| (b.clone(), vec![])).collect());
844 assert!(certified_blocks.is_empty());
845
846 let ancestors = all_blocks
848 .iter()
849 .filter_map(|b| {
850 if b.round() == 1 && b.author().value() == 3 {
851 Some(b.reference())
852 } else if b.round() == 2 {
853 assert_ne!(b.author().value(), 3);
854 Some(b.reference())
855 } else {
856 None
857 }
858 })
859 .collect::<Vec<_>>();
860 assert_eq!(ancestors.len(), 4, "Ancestors {:?}", ancestors);
861 let mut round_3_blocks = vec![];
862 for author in 0..4 {
863 let block = TestBlock::new(3, author)
864 .set_ancestors(ancestors.clone())
865 .set_transactions(transactions.clone());
866 round_3_blocks.push(VerifiedBlock::new_for_test(block.build()));
867 }
868
869 let mut certifier = CertifierState::new(context.clone());
871 certifier.add_voted_blocks(all_blocks.iter().map(|b| (b.clone(), vec![])).collect());
872 let proposed_block = round_3_blocks.pop().unwrap();
873 let mut certified_blocks =
874 certifier.add_voted_blocks(vec![(proposed_block.clone(), vec![])]);
875 certified_blocks.extend(certifier.add_proposed_block(proposed_block));
876 assert_eq!(
877 certified_blocks.len(),
878 2,
879 "Certified blocks {:?}",
880 certified_blocks
881 .iter()
882 .map(|b| b.block.reference().to_string())
883 .join(",")
884 );
885 assert_eq!(
886 certified_blocks[0].block.reference(),
887 round_1_blocks[0].reference()
888 );
889 assert!(certified_blocks[0].rejected.is_empty());
890 assert_eq!(
891 certified_blocks[1].block.reference(),
892 round_1_blocks[2].reference()
893 );
894 assert_eq!(certified_blocks[1].rejected, vec![2]);
895 }
896
897 #[tokio::test]
899 async fn test_certify_randomized() {
900 telemetry_subscribers::init_for_testing();
901 let num_authorities: u32 = 7;
902 let (context, _) = Context::new_for_test(num_authorities as usize);
903 let context = Arc::new(context);
904
905 let num_rounds = 50;
907 let mut dag_builder = DagBuilder::new(context.clone());
908 dag_builder
909 .layers(1..=num_rounds)
910 .min_ancestor_links(false, None)
911 .build();
912 let all_blocks = dag_builder.all_blocks();
913
914 let mut certifier = CertifierState::new(context.clone());
916 let mut expected_certified_blocks =
917 certifier.add_voted_blocks(all_blocks.iter().map(|b| (b.clone(), vec![])).collect());
918 expected_certified_blocks.sort_by_key(|b| b.block.reference());
919
920 for _ in 0..100 {
922 let mut all_blocks = all_blocks.clone();
924 all_blocks.shuffle(&mut rand::thread_rng());
925 let mut certifier = CertifierState::new(context.clone());
926
927 let mut actual_certified_blocks = certifier
929 .add_voted_blocks(all_blocks.iter().map(|b| (b.clone(), vec![])).collect());
930 actual_certified_blocks.sort_by_key(|b| b.block.reference());
931
932 assert_eq!(
934 actual_certified_blocks.len(),
935 expected_certified_blocks.len()
936 );
937 for (actual, expected) in actual_certified_blocks
938 .iter()
939 .zip(expected_certified_blocks.iter())
940 {
941 assert_eq!(actual.block.reference(), expected.block.reference());
942 assert_eq!(actual.rejected, expected.rejected);
943 }
944 }
945 }
946}