1use std::{
5 cmp::max,
6 collections::{BTreeMap, BTreeSet, VecDeque},
7 ops::Bound::{Excluded, Included, Unbounded},
8 panic,
9 sync::Arc,
10 time::Duration,
11 vec,
12};
13
14use consensus_config::{AuthorityIndex, Stake};
15use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs, Round, TransactionIndex};
16use itertools::Itertools as _;
17use mysten_common::ZipDebugEqIteratorExt;
18use tokio::time::Instant;
19use tracing::{debug, error, info, trace};
20
21use crate::{
22 CommittedSubDag,
23 block::{BlockAPI, GENESIS_ROUND, Slot, VerifiedBlock, genesis_blocks},
24 commit::{
25 CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRef, CommitVote,
26 GENESIS_COMMIT_INDEX, TrustedCommit, load_committed_subdag_from_store,
27 },
28 context::Context,
29 leader_scoring::{ReputationScores, ScoringSubdag},
30 storage::{Store, WriteBatch},
31 threshold_clock::ThresholdClock,
32};
33
34pub struct DagState {
42 context: Arc<Context>,
43
44 genesis: BTreeMap<BlockRef, VerifiedBlock>,
46
47 recent_blocks: BTreeMap<BlockRef, BlockInfo>,
55
56 recent_refs_by_authority: Vec<BTreeSet<BlockRef>>,
59
60 round_info: VecDeque<RoundInfo>,
64
65 threshold_clock: ThresholdClock,
67
68 evicted_rounds: Vec<Round>,
72
73 highest_accepted_round: Round,
75
76 last_commit: Option<TrustedCommit>,
78
79 last_commit_round_advancement_time: Option<std::time::Instant>,
81
82 last_committed_rounds: Vec<Round>,
84
85 scoring_subdag: ScoringSubdag,
88
89 pending_commit_votes: VecDeque<CommitVote>,
93
94 blocks_to_write: Vec<VerifiedBlock>,
97 commits_to_write: Vec<TrustedCommit>,
98
99 commit_info_to_write: Vec<(CommitRef, CommitInfo)>,
103
104 finalized_commits_to_write: Vec<(CommitRef, BTreeMap<BlockRef, Vec<TransactionIndex>>)>,
106
107 store: Arc<dyn Store>,
109
110 cached_rounds: Round,
112}
113
114impl DagState {
115 pub fn new(context: Arc<Context>, store: Arc<dyn Store>) -> Self {
117 let cached_rounds = context.parameters.dag_state_cached_rounds as Round;
118 let num_authorities = context.committee.size();
119
120 let genesis = genesis_blocks(context.as_ref())
121 .into_iter()
122 .map(|block| (block.reference(), block))
123 .collect();
124
125 let threshold_clock = ThresholdClock::new(1, context.clone());
126
127 let last_commit = store
128 .read_last_commit()
129 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
130
131 let commit_info = store
132 .read_last_commit_info()
133 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
134 let (mut last_committed_rounds, commit_recovery_start_index) =
135 if let Some((commit_ref, commit_info)) = commit_info {
136 tracing::info!("Recovering committed state from {commit_ref} {commit_info:?}");
137 (commit_info.committed_rounds, commit_ref.index + 1)
138 } else {
139 tracing::info!("Found no stored CommitInfo to recover from");
140 (vec![0; num_authorities], GENESIS_COMMIT_INDEX + 1)
141 };
142
143 let mut unscored_committed_subdags = Vec::new();
144 let mut scoring_subdag = ScoringSubdag::new(context.clone());
145
146 if let Some(last_commit) = last_commit.as_ref() {
147 store
148 .scan_commits((commit_recovery_start_index..=last_commit.index()).into())
149 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
150 .iter()
151 .for_each(|commit| {
152 for block_ref in commit.blocks() {
153 last_committed_rounds[block_ref.author] =
154 max(last_committed_rounds[block_ref.author], block_ref.round);
155 }
156 let committed_subdag =
157 load_committed_subdag_from_store(store.as_ref(), commit.clone());
158 unscored_committed_subdags.push(committed_subdag);
159 });
160 }
161
162 tracing::info!(
163 "DagState was initialized with the following state: \
164 {last_commit:?}; {last_committed_rounds:?}; {} unscored committed subdags;",
165 unscored_committed_subdags.len()
166 );
167
168 scoring_subdag.add_subdags(std::mem::take(&mut unscored_committed_subdags));
169
170 let mut state = Self {
171 context: context.clone(),
172 genesis,
173 recent_blocks: BTreeMap::new(),
174 recent_refs_by_authority: vec![BTreeSet::new(); num_authorities],
175 round_info: VecDeque::new(),
176 threshold_clock,
177 highest_accepted_round: 0,
178 last_commit: last_commit.clone(),
179 last_commit_round_advancement_time: None,
180 last_committed_rounds: last_committed_rounds.clone(),
181 pending_commit_votes: VecDeque::new(),
182 blocks_to_write: vec![],
183 commits_to_write: vec![],
184 commit_info_to_write: vec![],
185 finalized_commits_to_write: vec![],
186 scoring_subdag,
187 store: store.clone(),
188 cached_rounds,
189 evicted_rounds: vec![0; num_authorities],
190 };
191
192 let mut recovered_blocks = Vec::new();
193 for (authority_index, _) in context.committee.authorities() {
194 let (blocks, eviction_round) = {
195 let last_block = state
198 .store
199 .scan_last_blocks_by_author(authority_index, 1, None)
200 .expect("Database error");
201 let last_block_round = last_block
202 .last()
203 .map(|b| b.round())
204 .unwrap_or(GENESIS_ROUND);
205
206 let eviction_round =
207 Self::eviction_round(last_block_round, state.gc_round(), state.cached_rounds);
208 let blocks = state
209 .store
210 .scan_blocks_by_author(authority_index, eviction_round + 1)
211 .expect("Database error");
212
213 (blocks, eviction_round)
214 };
215
216 debug!(
217 "Recovered blocks {}: {:?}",
218 authority_index,
219 blocks
220 .iter()
221 .map(|b| b.reference())
222 .collect::<Vec<BlockRef>>()
223 );
224 recovered_blocks.extend(blocks);
225
226 state.evicted_rounds[authority_index] = eviction_round;
227 }
228
229 recovered_blocks.sort_by_key(|b| b.reference());
231 for block in &recovered_blocks {
232 state.update_block_metadata(block);
233 }
234
235 if let Some(last_commit) = last_commit {
236 let mut index = last_commit.index();
237 let gc_round = state.gc_round();
238 info!(
239 "Recovering block commit statuses from commit index {} and backwards until leader of round <= gc_round {:?}",
240 index, gc_round
241 );
242
243 loop {
244 let commits = store
245 .scan_commits((index..=index).into())
246 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
247 let Some(commit) = commits.first() else {
248 info!("Recovering finished up to index {index}, no more commits to recover");
249 break;
250 };
251
252 if gc_round > 0 && commit.leader().round <= gc_round {
254 info!(
255 "Recovering finished, reached commit leader round {} <= gc_round {}",
256 commit.leader().round,
257 gc_round
258 );
259 break;
260 }
261
262 commit.blocks().iter().filter(|b| b.round > gc_round).for_each(|block_ref|{
263 debug!(
264 "Setting block {:?} as committed based on commit {:?}",
265 block_ref,
266 commit.index()
267 );
268 assert!(state.set_committed(block_ref), "Attempted to set again a block {:?} as committed when recovering commit {:?}", block_ref, commit);
269 });
270
271 index = index.saturating_sub(1);
273 if index == 0 {
274 break;
275 }
276 }
277 }
278
279 let proposed_blocks = store
281 .scan_blocks_by_author(context.own_index, state.gc_round() + 1)
282 .expect("Database error");
283 for block in proposed_blocks {
284 state.link_causal_history(block.reference());
285 }
286
287 state
288 }
289
290 pub(crate) fn accept_block(&mut self, block: VerifiedBlock) {
292 assert_ne!(
293 block.round(),
294 0,
295 "Genesis block should not be accepted into DAG."
296 );
297
298 let block_ref = block.reference();
299 if self.contains_block(&block_ref) {
300 return;
301 }
302
303 let now = self.context.clock.timestamp_utc_ms();
304 if block.timestamp_ms() > now {
305 trace!(
306 "Block {:?} with timestamp {} is greater than local timestamp {}.",
307 block,
308 block.timestamp_ms(),
309 now,
310 );
311 }
312 let hostname = &self.context.committee.authority(block_ref.author).hostname;
313 self.context
314 .metrics
315 .node_metrics
316 .accepted_block_time_drift_ms
317 .with_label_values(&[hostname])
318 .inc_by(block.timestamp_ms().saturating_sub(now));
319
320 if block_ref.author == self.context.own_index {
323 let existing_blocks = self.get_uncommitted_blocks_at_slot(block_ref.into());
324 if !self
325 .context
326 .parameters
327 .internal
328 .skip_equivocation_validation
329 {
330 assert!(
331 existing_blocks.is_empty(),
332 "Block Rejected! Attempted to add block {block:#?} to own slot where \
333 block(s) {existing_blocks:#?} already exists."
334 );
335 }
336 }
337 self.update_block_metadata(&block);
338 self.blocks_to_write.push(block);
339 let source = if self.context.own_index == block_ref.author {
340 "own"
341 } else {
342 "others"
343 };
344 self.context
345 .metrics
346 .node_metrics
347 .accepted_blocks
348 .with_label_values(&[source])
349 .inc();
350 }
351
352 fn update_block_metadata(&mut self, block: &VerifiedBlock) {
354 let block_ref = block.reference();
355 self.recent_blocks
356 .insert(block_ref, BlockInfo::new(block.clone()));
357 self.recent_refs_by_authority[block_ref.author].insert(block_ref);
358 if self.context.protocol_config.enable_v3() {
359 for ancestor in block.ancestors() {
361 if ancestor.round + 1 != block_ref.round || ancestor.round <= self.gc_round() {
363 continue;
364 }
365 let block_info = self.recent_blocks.get_mut(ancestor).unwrap_or_else(|| {
366 panic!(
367 "Parent block {} of block {} does not exist",
368 ancestor, block_ref
369 )
370 });
371 if block_info.children.insert(block_ref)
372 && block_info.children_authorities.insert(block_ref.author)
373 {
374 let child_stake = self.context.committee.stake(block_ref.author);
375 block_info.total_children_stake += child_stake;
376 }
377 }
378 self.update_round_info(block);
379 }
380 if self.threshold_clock.add_block(block_ref) {
381 if let Some(last_proposed_block) = self.get_last_proposed_block()
383 && last_proposed_block.round() == block_ref.round
384 {
385 let quorum_delay_ms = self
386 .context
387 .clock
388 .timestamp_utc_ms()
389 .saturating_sub(last_proposed_block.timestamp_ms());
390 self.context
391 .metrics
392 .node_metrics
393 .quorum_receive_latency
394 .observe(Duration::from_millis(quorum_delay_ms).as_secs_f64());
395 }
396 }
397
398 self.highest_accepted_round = max(self.highest_accepted_round, block.round());
399 self.context
400 .metrics
401 .node_metrics
402 .highest_accepted_round
403 .set(self.highest_accepted_round as i64);
404
405 let highest_accepted_round_for_author = self.recent_refs_by_authority[block_ref.author]
406 .last()
407 .map(|block_ref| block_ref.round)
408 .expect("There should be by now at least one block ref");
409 let hostname = &self.context.committee.authority(block_ref.author).hostname;
410 self.context
411 .metrics
412 .node_metrics
413 .highest_accepted_authority_round
414 .with_label_values(&[hostname])
415 .set(highest_accepted_round_for_author as i64);
416 }
417
418 pub(crate) fn accept_blocks(&mut self, blocks: Vec<VerifiedBlock>) {
420 debug!(
421 "Accepting blocks: {}",
422 blocks.iter().map(|b| b.reference().to_string()).join(",")
423 );
424 for block in blocks {
425 self.accept_block(block);
426 }
427 }
428
429 pub(crate) fn get_block(&self, reference: &BlockRef) -> Option<VerifiedBlock> {
432 self.get_blocks(&[*reference])
433 .pop()
434 .expect("Exactly one element should be returned")
435 }
436
437 pub(crate) fn get_blocks(&self, block_refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
440 if block_refs.is_empty() {
441 return vec![];
442 }
443
444 let mut blocks = vec![None; block_refs.len()];
445 let mut missing = Vec::new();
446
447 for (index, block_ref) in block_refs.iter().enumerate() {
448 if block_ref.round == GENESIS_ROUND {
449 if let Some(block) = self.genesis.get(block_ref) {
451 blocks[index] = Some(block.clone());
452 }
453 continue;
454 }
455 if let Some(block_info) = self.recent_blocks.get(block_ref) {
456 blocks[index] = Some(block_info.block.clone());
457 continue;
458 }
459 missing.push((index, block_ref));
460 }
461
462 if missing.is_empty() {
463 return blocks;
464 }
465
466 let missing_refs = missing
467 .iter()
468 .map(|(_, block_ref)| **block_ref)
469 .collect::<Vec<_>>();
470 let store_results = self
471 .store
472 .read_blocks(&missing_refs)
473 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
474 self.context
475 .metrics
476 .node_metrics
477 .dag_state_store_read_count
478 .with_label_values(&["get_blocks"])
479 .inc();
480
481 for ((index, _), result) in missing.into_iter().zip_debug_eq(store_results.into_iter()) {
482 blocks[index] = result;
483 }
484
485 blocks
486 }
487
488 pub(crate) fn get_uncommitted_blocks_at_slot(&self, slot: Slot) -> Vec<VerifiedBlock> {
491 let mut blocks = vec![];
495 for (_block_ref, block_info) in self.recent_blocks.range((
496 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
497 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
498 )) {
499 blocks.push(block_info.block.clone())
500 }
501 blocks
502 }
503
504 pub(crate) fn get_uncommitted_blocks_at_round(&self, round: Round) -> Vec<VerifiedBlock> {
507 if round <= self.last_commit_round() {
508 panic!("Round {} have committed blocks!", round);
509 }
510
511 let mut blocks = vec![];
512 for (_block_ref, block_info) in self.recent_blocks.range((
513 Included(BlockRef::new(round, AuthorityIndex::ZERO, BlockDigest::MIN)),
514 Excluded(BlockRef::new(
515 round + 1,
516 AuthorityIndex::ZERO,
517 BlockDigest::MIN,
518 )),
519 )) {
520 blocks.push(block_info.block.clone())
521 }
522 blocks
523 }
524
525 #[cfg(test)]
526 pub(crate) fn get_block_children(&self, block_ref: &BlockRef) -> Option<Vec<BlockRef>> {
527 if block_ref.round <= self.gc_round() {
528 return None;
529 }
530 self.recent_blocks
531 .get(block_ref)
532 .map(|block_info| block_info.children.iter().cloned().collect())
533 }
534
535 #[cfg(test)]
536 pub(crate) fn get_block_children_authorities(
537 &self,
538 block_ref: &BlockRef,
539 ) -> Option<BTreeSet<AuthorityIndex>> {
540 if block_ref.round <= self.gc_round() {
541 return None;
542 }
543 self.recent_blocks
544 .get(block_ref)
545 .map(|block_info| block_info.children_authorities.clone())
546 }
547
548 #[cfg(test)]
549 pub(crate) fn get_block_total_children_stake(&self, block_ref: &BlockRef) -> Option<Stake> {
550 if block_ref.round <= self.gc_round() {
551 return None;
552 }
553 self.recent_blocks
554 .get(block_ref)
555 .map(|block_info| block_info.total_children_stake)
556 }
557
558 pub(crate) fn ancestors_at_round(
560 &self,
561 later_block: &VerifiedBlock,
562 earlier_round: Round,
563 ) -> Vec<VerifiedBlock> {
564 let mut linked: BTreeSet<BlockRef> = later_block.ancestors().iter().cloned().collect();
566 while !linked.is_empty() {
567 let round = linked.last().unwrap().round;
568 if round <= earlier_round {
570 break;
571 }
572 let block_ref = linked.pop_last().unwrap();
573 let Some(block) = self.get_block(&block_ref) else {
574 panic!("Block {:?} should exist in DAG!", block_ref);
575 };
576 linked.extend(block.ancestors().iter().cloned());
577 }
578 linked
579 .range((
580 Included(BlockRef::new(
581 earlier_round,
582 AuthorityIndex::ZERO,
583 BlockDigest::MIN,
584 )),
585 Unbounded,
586 ))
587 .map(|r| {
588 self.get_block(r)
589 .unwrap_or_else(|| panic!("Block {:?} should exist in DAG!", r))
590 .clone()
591 })
592 .collect()
593 }
594
595 pub(crate) fn get_last_proposed_block(&self) -> Option<VerifiedBlock> {
599 if self.context.is_validator() {
600 Some(self.get_last_block_for_authority(self.context.own_index))
601 } else {
602 None
603 }
604 }
605
606 pub(crate) fn get_last_block_for_authority(&self, authority: AuthorityIndex) -> VerifiedBlock {
609 if let Some(last) = self.recent_refs_by_authority[authority].last() {
610 return self
611 .recent_blocks
612 .get(last)
613 .expect("Block should be found in recent blocks")
614 .block
615 .clone();
616 }
617
618 let (_, genesis_block) = self
620 .genesis
621 .iter()
622 .find(|(block_ref, _)| block_ref.author == authority)
623 .expect("Genesis should be found for authority {authority_index}");
624 genesis_block.clone()
625 }
626
627 pub(crate) fn get_cached_blocks(
633 &self,
634 authority: AuthorityIndex,
635 start: Round,
636 ) -> Vec<VerifiedBlock> {
637 self.get_cached_blocks_in_range(authority, start, Round::MAX, usize::MAX)
638 }
639
640 pub(crate) fn get_cached_blocks_in_range(
643 &self,
644 authority: AuthorityIndex,
645 start_round: Round,
646 end_round: Round,
647 limit: usize,
648 ) -> Vec<VerifiedBlock> {
649 if start_round >= end_round || limit == 0 {
650 return vec![];
651 }
652
653 let mut blocks = vec![];
654 for block_ref in self.recent_refs_by_authority[authority].range((
655 Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
656 Excluded(BlockRef::new(
657 end_round,
658 AuthorityIndex::MIN,
659 BlockDigest::MIN,
660 )),
661 )) {
662 let block_info = self
663 .recent_blocks
664 .get(block_ref)
665 .expect("Block should exist in recent blocks");
666 blocks.push(block_info.block.clone());
667 if blocks.len() >= limit {
668 break;
669 }
670 }
671 blocks
672 }
673
674 pub(crate) fn get_last_cached_block_in_range(
676 &self,
677 authority: AuthorityIndex,
678 start_round: Round,
679 end_round: Round,
680 ) -> Option<VerifiedBlock> {
681 if start_round >= end_round {
682 return None;
683 }
684
685 let block_ref = self.recent_refs_by_authority[authority]
686 .range((
687 Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
688 Excluded(BlockRef::new(
689 end_round,
690 AuthorityIndex::MIN,
691 BlockDigest::MIN,
692 )),
693 ))
694 .last()?;
695
696 self.recent_blocks
697 .get(block_ref)
698 .map(|block_info| block_info.block.clone())
699 }
700
701 pub(crate) fn get_last_cached_block_per_authority(
708 &self,
709 end_round: Round,
710 ) -> Vec<(VerifiedBlock, Vec<BlockRef>)> {
711 let mut blocks = self.genesis.values().cloned().collect::<Vec<_>>();
713 let mut equivocating_blocks = vec![vec![]; self.context.committee.size()];
714
715 if end_round == GENESIS_ROUND {
716 panic!(
717 "Attempted to retrieve blocks earlier than the genesis round which is not possible"
718 );
719 }
720
721 if end_round == GENESIS_ROUND + 1 {
722 return blocks.into_iter().map(|b| (b, vec![])).collect();
723 }
724
725 for (authority_index, block_refs) in self.recent_refs_by_authority.iter().enumerate() {
726 let authority_index = self
727 .context
728 .committee
729 .to_authority_index(authority_index)
730 .unwrap();
731
732 let last_evicted_round = self.evicted_rounds[authority_index];
733 if end_round.saturating_sub(1) <= last_evicted_round {
734 panic!(
735 "Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}",
736 );
737 }
738
739 let block_ref_iter = block_refs
740 .range((
741 Included(BlockRef::new(
742 last_evicted_round + 1,
743 authority_index,
744 BlockDigest::MIN,
745 )),
746 Excluded(BlockRef::new(end_round, authority_index, BlockDigest::MIN)),
747 ))
748 .rev();
749
750 let mut last_round = 0;
751 for block_ref in block_ref_iter {
752 if last_round == 0 {
753 last_round = block_ref.round;
754 let block_info = self
755 .recent_blocks
756 .get(block_ref)
757 .expect("Block should exist in recent blocks");
758 blocks[authority_index] = block_info.block.clone();
759 continue;
760 }
761 if block_ref.round < last_round {
762 break;
763 }
764 equivocating_blocks[authority_index].push(*block_ref);
765 }
766 }
767
768 blocks
769 .into_iter()
770 .zip_debug_eq(equivocating_blocks)
771 .collect()
772 }
773
774 pub(crate) fn contains_cached_block_at_slot(&self, slot: Slot) -> bool {
777 if slot.round == GENESIS_ROUND {
779 return true;
780 }
781
782 let eviction_round = self.evicted_rounds[slot.authority];
783 if slot.round <= eviction_round {
784 panic!(
785 "{}",
786 format!(
787 "Attempted to check for slot {slot} that is <= the last evicted round {eviction_round}"
788 )
789 );
790 }
791
792 let mut result = self.recent_refs_by_authority[slot.authority].range((
793 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
794 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
795 ));
796 result.next().is_some()
797 }
798
799 pub(crate) fn contains_blocks(&self, block_refs: Vec<BlockRef>) -> Vec<bool> {
802 let mut exist = vec![false; block_refs.len()];
803 let mut missing = Vec::new();
804
805 for (index, block_ref) in block_refs.into_iter().enumerate() {
806 let recent_refs = &self.recent_refs_by_authority[block_ref.author];
807 if recent_refs.contains(&block_ref) || self.genesis.contains_key(&block_ref) {
808 exist[index] = true;
809 } else if recent_refs.is_empty() || recent_refs.last().unwrap().round < block_ref.round
810 {
811 exist[index] = false;
815 } else {
816 missing.push((index, block_ref));
817 }
818 }
819
820 if missing.is_empty() {
821 return exist;
822 }
823
824 let missing_refs = missing
825 .iter()
826 .map(|(_, block_ref)| *block_ref)
827 .collect::<Vec<_>>();
828 let store_results = self
829 .store
830 .contains_blocks(&missing_refs)
831 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
832 self.context
833 .metrics
834 .node_metrics
835 .dag_state_store_read_count
836 .with_label_values(&["contains_blocks"])
837 .inc();
838
839 for ((index, _), result) in missing.into_iter().zip_debug_eq(store_results.into_iter()) {
840 exist[index] = result;
841 }
842
843 exist
844 }
845
846 pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool {
847 let blocks = self.contains_blocks(vec![*block_ref]);
848 blocks.first().cloned().unwrap()
849 }
850
851 pub(crate) fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
854 if let Some(block_info) = self.recent_blocks.get_mut(block_ref) {
855 if !block_info.committed {
856 block_info.committed = true;
857 return true;
858 }
859 false
860 } else {
861 panic!(
862 "Block {:?} not found in cache to set as committed.",
863 block_ref
864 );
865 }
866 }
867
868 pub(crate) fn is_committed(&self, block_ref: &BlockRef) -> bool {
870 self.recent_blocks
871 .get(block_ref)
872 .unwrap_or_else(|| panic!("Attempted to query for commit status for a block not in cached data {block_ref}"))
873 .committed
874 }
875
876 pub(crate) fn link_causal_history(&mut self, root_block: BlockRef) -> Vec<BlockRef> {
882 let gc_round = self.gc_round();
883 let mut linked_blocks = vec![];
884 let mut targets = VecDeque::new();
885 targets.push_back(root_block);
886 while let Some(block_ref) = targets.pop_front() {
887 if block_ref.round <= gc_round {
895 continue;
896 }
897 let block_info = self
898 .recent_blocks
899 .get_mut(&block_ref)
900 .unwrap_or_else(|| panic!("Block {:?} is not in DAG state", block_ref));
901 if block_info.included {
902 continue;
903 }
904 linked_blocks.push(block_ref);
905 block_info.included = true;
906 targets.extend(block_info.block.ancestors().iter());
907 }
908 linked_blocks
909 }
910
911 pub(crate) fn has_been_included(&self, block_ref: &BlockRef) -> bool {
914 self.recent_blocks
915 .get(block_ref)
916 .unwrap_or_else(|| {
917 panic!(
918 "Attempted to query for inclusion status for a block not in cached data {}",
919 block_ref
920 )
921 })
922 .included
923 }
924
925 pub(crate) fn threshold_clock_round(&self) -> Round {
926 self.threshold_clock.get_round()
927 }
928
929 pub(crate) fn threshold_clock_quorum_ts(&self) -> Instant {
931 self.threshold_clock.get_quorum_ts()
932 }
933
934 pub(crate) fn highest_accepted_round(&self) -> Round {
935 self.highest_accepted_round
936 }
937
938 #[cfg(test)]
941 pub(crate) fn round_info(&self, round: Round) -> Option<&RoundInfo> {
942 let front_round = self.round_info.front()?.round;
943 if round < front_round || round <= self.gc_round() {
944 return None;
945 }
946 let round_info = self.round_info.get((round - front_round) as usize)?;
947 assert_eq!(
948 round_info.round, round,
949 "RoundInfo round {} does not match requested round {}. RoundInfo should be contiguous.",
950 round_info.round, round
951 );
952 Some(round_info)
953 }
954
955 fn update_round_info(&mut self, block: &VerifiedBlock) {
958 let block_ref = block.reference();
959
960 let gc_round = self.gc_round();
962 if block.round() <= gc_round {
963 return;
964 }
965
966 let next_round = self
969 .round_info
970 .back()
971 .map(|info| info.round + 1)
972 .unwrap_or(gc_round + 1);
974 assert!(
976 block.round() <= next_round,
977 "Attempted to update round info for block {block_ref} with round higher than next round {next_round}"
978 );
979 if block.round() == next_round {
980 self.round_info.push_back(RoundInfo {
981 round: block.round(),
982 authorities: BTreeSet::new(),
983 total_stake: 0,
984 });
985 }
986
987 let front_round = self
989 .round_info
990 .front()
991 .expect("round_info non-empty after extend")
992 .round;
993 let index = (block.round() - front_round) as usize;
994 let info = &mut self.round_info[index];
995 if info.authorities.insert(block_ref.author) {
996 info.total_stake += self.context.committee.stake(block_ref.author);
997 }
998 }
999
1000 pub(crate) fn add_commit(&mut self, commit: TrustedCommit) {
1003 let time_diff = if let Some(last_commit) = &self.last_commit {
1004 if commit.index() <= last_commit.index() {
1005 error!(
1006 "New commit index {} <= last commit index {}!",
1007 commit.index(),
1008 last_commit.index()
1009 );
1010 return;
1011 }
1012 assert_eq!(commit.index(), last_commit.index() + 1);
1013
1014 if commit.timestamp_ms() < last_commit.timestamp_ms() {
1015 panic!(
1016 "Commit timestamps do not monotonically increment, prev commit {:?}, new commit {:?}",
1017 last_commit, commit
1018 );
1019 }
1020 commit
1021 .timestamp_ms()
1022 .saturating_sub(last_commit.timestamp_ms())
1023 } else {
1024 assert_eq!(commit.index(), 1);
1025 0
1026 };
1027
1028 self.context
1029 .metrics
1030 .node_metrics
1031 .last_commit_time_diff
1032 .observe(time_diff as f64);
1033
1034 let commit_round_advanced = if let Some(previous_commit) = &self.last_commit {
1035 previous_commit.round() < commit.round()
1036 } else {
1037 true
1038 };
1039
1040 self.last_commit = Some(commit.clone());
1041
1042 if commit_round_advanced {
1043 let now = std::time::Instant::now();
1044 if let Some(previous_time) = self.last_commit_round_advancement_time {
1045 self.context
1046 .metrics
1047 .node_metrics
1048 .commit_round_advancement_interval
1049 .observe(now.duration_since(previous_time).as_secs_f64())
1050 }
1051 self.last_commit_round_advancement_time = Some(now);
1052 }
1053
1054 for block_ref in commit.blocks().iter() {
1055 self.last_committed_rounds[block_ref.author] = max(
1056 self.last_committed_rounds[block_ref.author],
1057 block_ref.round,
1058 );
1059 }
1060
1061 for (i, round) in self.last_committed_rounds.iter().enumerate() {
1062 let index = self.context.committee.to_authority_index(i).unwrap();
1063 let hostname = &self.context.committee.authority(index).hostname;
1064 self.context
1065 .metrics
1066 .node_metrics
1067 .last_committed_authority_round
1068 .with_label_values(&[hostname])
1069 .set((*round).into());
1070 }
1071
1072 self.pending_commit_votes.push_back(commit.reference());
1073 self.commits_to_write.push(commit);
1074 }
1075
1076 pub(crate) fn recover_commits_to_write(&mut self, commits: Vec<TrustedCommit>) {
1078 self.commits_to_write.extend(commits);
1079 }
1080
1081 pub(crate) fn ensure_commits_to_write_is_empty(&self) {
1082 assert!(
1083 self.commits_to_write.is_empty(),
1084 "Commits to write should be empty. {:?}",
1085 self.commits_to_write,
1086 );
1087 }
1088
1089 pub(crate) fn add_commit_info(&mut self, reputation_scores: ReputationScores) {
1090 assert!(self.scoring_subdag.is_empty());
1094
1095 let commit_info = CommitInfo {
1096 committed_rounds: self.last_committed_rounds.clone(),
1097 reputation_scores,
1098 };
1099 let last_commit = self
1100 .last_commit
1101 .as_ref()
1102 .expect("Last commit should already be set.");
1103 self.commit_info_to_write
1104 .push((last_commit.reference(), commit_info));
1105 }
1106
1107 pub(crate) fn add_finalized_commit(
1108 &mut self,
1109 commit_ref: CommitRef,
1110 rejected_transactions: BTreeMap<BlockRef, Vec<TransactionIndex>>,
1111 ) {
1112 self.finalized_commits_to_write
1113 .push((commit_ref, rejected_transactions));
1114 }
1115
1116 pub(crate) fn take_commit_votes(&mut self, limit: usize) -> Vec<CommitVote> {
1117 let mut votes = Vec::new();
1118 while !self.pending_commit_votes.is_empty() && votes.len() < limit {
1119 votes.push(self.pending_commit_votes.pop_front().unwrap());
1120 }
1121 votes
1122 }
1123
1124 pub(crate) fn last_commit_index(&self) -> CommitIndex {
1126 match &self.last_commit {
1127 Some(commit) => commit.index(),
1128 None => 0,
1129 }
1130 }
1131
1132 pub(crate) fn last_commit_digest(&self) -> CommitDigest {
1134 match &self.last_commit {
1135 Some(commit) => commit.digest(),
1136 None => CommitDigest::MIN,
1137 }
1138 }
1139
1140 pub(crate) fn last_commit_timestamp_ms(&self) -> BlockTimestampMs {
1142 match &self.last_commit {
1143 Some(commit) => commit.timestamp_ms(),
1144 None => 0,
1145 }
1146 }
1147
1148 pub(crate) fn last_commit_leader(&self) -> Slot {
1150 match &self.last_commit {
1151 Some(commit) => commit.leader().into(),
1152 None => self
1153 .genesis
1154 .iter()
1155 .next()
1156 .map(|(genesis_ref, _)| *genesis_ref)
1157 .expect("Genesis blocks should always be available.")
1158 .into(),
1159 }
1160 }
1161
1162 pub(crate) fn last_commit_round(&self) -> Round {
1164 match &self.last_commit {
1165 Some(commit) => commit.leader().round,
1166 None => 0,
1167 }
1168 }
1169
1170 pub(crate) fn last_committed_rounds(&self) -> Vec<Round> {
1172 self.last_committed_rounds.clone()
1173 }
1174
1175 pub(crate) fn gc_round(&self) -> Round {
1179 self.calculate_gc_round(self.last_commit_round())
1180 }
1181
1182 pub(crate) fn calculate_gc_round(&self, commit_round: Round) -> Round {
1185 commit_round.saturating_sub(self.context.protocol_config.gc_depth())
1186 }
1187
1188 pub(crate) fn flush(&mut self) {
1198 let _s = self
1199 .context
1200 .metrics
1201 .node_metrics
1202 .scope_processing_time
1203 .with_label_values(&["DagState::flush"])
1204 .start_timer();
1205
1206 let pending_blocks = std::mem::take(&mut self.blocks_to_write);
1208 let pending_commits = std::mem::take(&mut self.commits_to_write);
1209 let pending_commit_info = std::mem::take(&mut self.commit_info_to_write);
1210 let pending_finalized_commits = std::mem::take(&mut self.finalized_commits_to_write);
1211 if pending_blocks.is_empty()
1212 && pending_commits.is_empty()
1213 && pending_commit_info.is_empty()
1214 && pending_finalized_commits.is_empty()
1215 {
1216 return;
1217 }
1218
1219 debug!(
1220 "Flushing {} blocks ({}), {} commits ({}), {} commit infos ({}), {} finalized commits ({}) to storage.",
1221 pending_blocks.len(),
1222 pending_blocks
1223 .iter()
1224 .map(|b| b.reference().to_string())
1225 .join(","),
1226 pending_commits.len(),
1227 pending_commits
1228 .iter()
1229 .map(|c| c.reference().to_string())
1230 .join(","),
1231 pending_commit_info.len(),
1232 pending_commit_info
1233 .iter()
1234 .map(|(commit_ref, _)| commit_ref.to_string())
1235 .join(","),
1236 pending_finalized_commits.len(),
1237 pending_finalized_commits
1238 .iter()
1239 .map(|(commit_ref, _)| commit_ref.to_string())
1240 .join(","),
1241 );
1242 self.store
1243 .write(WriteBatch::new(
1244 pending_blocks,
1245 pending_commits,
1246 pending_commit_info,
1247 pending_finalized_commits,
1248 ))
1249 .unwrap_or_else(|e| panic!("Failed to write to storage: {:?}", e));
1250 self.context
1251 .metrics
1252 .node_metrics
1253 .dag_state_store_write_count
1254 .inc();
1255
1256 for (authority_index, _) in self.context.committee.authorities() {
1258 let eviction_round = self.calculate_authority_eviction_round(authority_index);
1259 while let Some(block_ref) = self.recent_refs_by_authority[authority_index].first() {
1260 if block_ref.round <= eviction_round {
1261 self.recent_blocks.remove(block_ref);
1262 self.recent_refs_by_authority[authority_index].pop_first();
1263 } else {
1264 break;
1265 }
1266 }
1267 self.evicted_rounds[authority_index] = eviction_round;
1268 }
1269
1270 while let Some(info) = self.round_info.front() {
1272 if info.round <= self.gc_round() {
1273 self.round_info.pop_front();
1274 } else {
1275 break;
1276 }
1277 }
1278
1279 let metrics = &self.context.metrics.node_metrics;
1280 metrics
1281 .dag_state_recent_blocks
1282 .set(self.recent_blocks.len() as i64);
1283 metrics.dag_state_recent_refs.set(
1284 self.recent_refs_by_authority
1285 .iter()
1286 .map(BTreeSet::len)
1287 .sum::<usize>() as i64,
1288 );
1289 }
1290
1291 pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> {
1292 self.store
1293 .read_last_commit_info()
1294 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
1295 }
1296
1297 pub(crate) fn add_scoring_subdags(&mut self, scoring_subdags: Vec<CommittedSubDag>) {
1298 self.scoring_subdag.add_subdags(scoring_subdags);
1299 }
1300
1301 pub(crate) fn clear_scoring_subdag(&mut self) {
1302 self.scoring_subdag.clear();
1303 }
1304
1305 pub(crate) fn scoring_subdags_count(&self) -> usize {
1306 self.scoring_subdag.scored_subdags_count()
1307 }
1308
1309 pub(crate) fn calculate_scoring_subdag_scores(&self) -> ReputationScores {
1310 self.scoring_subdag.calculate_distributed_vote_scores()
1311 }
1312
1313 pub(crate) fn scoring_subdag_commit_range(&self) -> CommitIndex {
1314 self.scoring_subdag
1315 .commit_range
1316 .as_ref()
1317 .expect("commit range should exist for scoring subdag")
1318 .end()
1319 }
1320
1321 fn calculate_authority_eviction_round(&self, authority_index: AuthorityIndex) -> Round {
1325 let last_round = self.recent_refs_by_authority[authority_index]
1326 .last()
1327 .map(|block_ref| block_ref.round)
1328 .unwrap_or(GENESIS_ROUND);
1329
1330 Self::eviction_round(last_round, self.gc_round(), self.cached_rounds)
1331 }
1332
1333 fn eviction_round(last_round: Round, gc_round: Round, cached_rounds: u32) -> Round {
1336 gc_round.min(last_round.saturating_sub(cached_rounds))
1337 }
1338
1339 pub(crate) fn store(&self) -> Arc<dyn Store> {
1341 self.store.clone()
1342 }
1343
1344 #[cfg(test)]
1347 pub(crate) fn last_quorum(&self) -> Vec<VerifiedBlock> {
1348 for round in
1351 (self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev()
1352 {
1353 if round == GENESIS_ROUND {
1354 return self.genesis_blocks();
1355 }
1356 use crate::stake_aggregator::{QuorumThreshold, StakeAggregator};
1357 let mut quorum = StakeAggregator::<QuorumThreshold>::new();
1358
1359 let blocks = self.get_uncommitted_blocks_at_round(round);
1361 for block in &blocks {
1362 if quorum.add(block.author(), &self.context.committee) {
1363 return blocks;
1364 }
1365 }
1366 }
1367
1368 panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds.");
1369 }
1370
1371 #[cfg(test)]
1372 pub(crate) fn genesis_blocks(&self) -> Vec<VerifiedBlock> {
1373 self.genesis.values().cloned().collect()
1374 }
1375
1376 #[cfg(test)]
1377 pub(crate) fn set_last_commit(&mut self, commit: TrustedCommit) {
1378 self.last_commit = Some(commit);
1379 }
1380}
1381
1382struct BlockInfo {
1383 block: VerifiedBlock,
1384
1385 children: BTreeSet<BlockRef>,
1388 children_authorities: BTreeSet<AuthorityIndex>,
1390 total_children_stake: Stake,
1392
1393 committed: bool,
1395 included: bool,
1402}
1403
1404impl BlockInfo {
1405 fn new(block: VerifiedBlock) -> Self {
1406 Self {
1407 block,
1408 children: BTreeSet::new(),
1409 children_authorities: BTreeSet::new(),
1410 total_children_stake: 0,
1411 committed: false,
1412 included: false,
1413 }
1414 }
1415}
1416
1417pub(crate) struct RoundInfo {
1421 pub(crate) round: Round,
1422 pub(crate) authorities: BTreeSet<AuthorityIndex>,
1424 pub(crate) total_stake: Stake,
1426}
1427
1428#[cfg(test)]
1429mod test {
1430 use std::vec;
1431
1432 use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs};
1433 use parking_lot::RwLock;
1434
1435 use super::*;
1436 use crate::{
1437 block::{TestBlock, VerifiedBlock},
1438 storage::{WriteBatch, mem_store::MemStore},
1439 test_dag_builder::DagBuilder,
1440 test_dag_parser::parse_dag,
1441 };
1442
1443 #[tokio::test]
1444 async fn test_get_blocks() {
1445 let (context, _) = Context::new_for_test(4);
1446 let context = Arc::new(context);
1447 let store = Arc::new(MemStore::new());
1448 let mut dag_state = DagState::new(context.clone(), store.clone());
1449 let own_index = AuthorityIndex::new_for_test(0);
1450
1451 let num_rounds: u32 = 10;
1453 let non_existent_round: u32 = 100;
1454 let num_authorities: u32 = 3;
1455 let num_blocks_per_slot: usize = 3;
1456 let mut blocks = BTreeMap::new();
1457 for round in 1..=num_rounds {
1458 for author in 0..num_authorities {
1459 let base_ts = round as BlockTimestampMs * 1000;
1461 for timestamp in base_ts..base_ts + num_blocks_per_slot as u64 {
1462 let block = VerifiedBlock::new_for_test(
1463 TestBlock::new(round, author)
1464 .set_timestamp_ms(timestamp)
1465 .build(),
1466 );
1467 dag_state.accept_block(block.clone());
1468 blocks.insert(block.reference(), block);
1469
1470 if AuthorityIndex::new_for_test(author) == own_index {
1472 break;
1473 }
1474 }
1475 }
1476 }
1477
1478 for (r, block) in &blocks {
1480 assert_eq!(&dag_state.get_block(r).unwrap(), block);
1481 }
1482
1483 let last_ref = blocks.keys().last().unwrap();
1485 assert!(
1486 dag_state
1487 .get_block(&BlockRef::new(
1488 last_ref.round,
1489 last_ref.author,
1490 BlockDigest::MIN
1491 ))
1492 .is_none()
1493 );
1494
1495 for round in 1..=num_rounds {
1497 for author in 0..num_authorities {
1498 let slot = Slot::new(
1499 round,
1500 context
1501 .committee
1502 .to_authority_index(author as usize)
1503 .unwrap(),
1504 );
1505 let blocks = dag_state.get_uncommitted_blocks_at_slot(slot);
1506
1507 if AuthorityIndex::new_for_test(author) == own_index {
1509 assert_eq!(blocks.len(), 1);
1510 } else {
1511 assert_eq!(blocks.len(), num_blocks_per_slot);
1512 }
1513
1514 for b in blocks {
1515 assert_eq!(b.round(), round);
1516 assert_eq!(
1517 b.author(),
1518 context
1519 .committee
1520 .to_authority_index(author as usize)
1521 .unwrap()
1522 );
1523 }
1524 }
1525 }
1526
1527 let slot = Slot::new(non_existent_round, AuthorityIndex::ZERO);
1529 assert!(dag_state.get_uncommitted_blocks_at_slot(slot).is_empty());
1530
1531 for round in 1..=num_rounds {
1533 let blocks = dag_state.get_uncommitted_blocks_at_round(round);
1534 assert_eq!(
1537 blocks.len(),
1538 (num_authorities - 1) as usize * num_blocks_per_slot + 1
1539 );
1540 for b in blocks {
1541 assert_eq!(b.round(), round);
1542 }
1543 }
1544
1545 assert!(
1547 dag_state
1548 .get_uncommitted_blocks_at_round(non_existent_round)
1549 .is_empty()
1550 );
1551 }
1552
1553 #[tokio::test]
1554 async fn test_ancestors_at_uncommitted_round() {
1555 let (context, _) = Context::new_for_test(4);
1557 let context = Arc::new(context);
1558 let store = Arc::new(MemStore::new());
1559 let mut dag_state = DagState::new(context.clone(), store.clone());
1560
1561 let round_10_refs: Vec<_> = (0..4)
1565 .map(|a| {
1566 VerifiedBlock::new_for_test(TestBlock::new(10, a).set_timestamp_ms(1000).build())
1567 .reference()
1568 })
1569 .collect();
1570
1571 let round_11 = [
1573 VerifiedBlock::new_for_test(
1575 TestBlock::new(11, 0)
1576 .set_timestamp_ms(1100)
1577 .set_ancestors(round_10_refs.clone())
1578 .build(),
1579 ),
1580 VerifiedBlock::new_for_test(
1583 TestBlock::new(11, 1)
1584 .set_timestamp_ms(1110)
1585 .set_ancestors(round_10_refs.clone())
1586 .build(),
1587 ),
1588 VerifiedBlock::new_for_test(
1590 TestBlock::new(11, 1)
1591 .set_timestamp_ms(1111)
1592 .set_ancestors(round_10_refs.clone())
1593 .build(),
1594 ),
1595 VerifiedBlock::new_for_test(
1597 TestBlock::new(11, 1)
1598 .set_timestamp_ms(1112)
1599 .set_ancestors(round_10_refs.clone())
1600 .build(),
1601 ),
1602 VerifiedBlock::new_for_test(
1604 TestBlock::new(11, 2)
1605 .set_timestamp_ms(1120)
1606 .set_ancestors(round_10_refs.clone())
1607 .build(),
1608 ),
1609 VerifiedBlock::new_for_test(
1611 TestBlock::new(11, 3)
1612 .set_timestamp_ms(1130)
1613 .set_ancestors(round_10_refs.clone())
1614 .build(),
1615 ),
1616 ];
1617
1618 let ancestors_for_round_12 = vec![
1620 round_11[0].reference(),
1621 round_11[1].reference(),
1622 round_11[5].reference(),
1623 ];
1624 let round_12 = [
1625 VerifiedBlock::new_for_test(
1626 TestBlock::new(12, 0)
1627 .set_timestamp_ms(1200)
1628 .set_ancestors(ancestors_for_round_12.clone())
1629 .build(),
1630 ),
1631 VerifiedBlock::new_for_test(
1632 TestBlock::new(12, 2)
1633 .set_timestamp_ms(1220)
1634 .set_ancestors(ancestors_for_round_12.clone())
1635 .build(),
1636 ),
1637 VerifiedBlock::new_for_test(
1638 TestBlock::new(12, 3)
1639 .set_timestamp_ms(1230)
1640 .set_ancestors(ancestors_for_round_12.clone())
1641 .build(),
1642 ),
1643 ];
1644
1645 let ancestors_for_round_13 = vec![
1647 round_12[0].reference(),
1648 round_12[1].reference(),
1649 round_12[2].reference(),
1650 round_11[2].reference(),
1651 ];
1652 let round_13 = [
1653 VerifiedBlock::new_for_test(
1654 TestBlock::new(12, 1)
1655 .set_timestamp_ms(1300)
1656 .set_ancestors(ancestors_for_round_13.clone())
1657 .build(),
1658 ),
1659 VerifiedBlock::new_for_test(
1660 TestBlock::new(12, 2)
1661 .set_timestamp_ms(1320)
1662 .set_ancestors(ancestors_for_round_13.clone())
1663 .build(),
1664 ),
1665 VerifiedBlock::new_for_test(
1666 TestBlock::new(12, 3)
1667 .set_timestamp_ms(1330)
1668 .set_ancestors(ancestors_for_round_13.clone())
1669 .build(),
1670 ),
1671 ];
1672
1673 let ancestors_for_round_14 = round_13.iter().map(|b| b.reference()).collect();
1675 let anchor = VerifiedBlock::new_for_test(
1676 TestBlock::new(14, 1)
1677 .set_timestamp_ms(1410)
1678 .set_ancestors(ancestors_for_round_14)
1679 .build(),
1680 );
1681
1682 for b in round_11
1684 .iter()
1685 .chain(round_12.iter())
1686 .chain(round_13.iter())
1687 .chain([anchor.clone()].iter())
1688 {
1689 dag_state.accept_block(b.clone());
1690 }
1691
1692 let ancestors = dag_state.ancestors_at_round(&anchor, 11);
1694 let mut ancestors_refs: Vec<BlockRef> = ancestors.iter().map(|b| b.reference()).collect();
1695 ancestors_refs.sort();
1696 let mut expected_refs = vec![
1697 round_11[0].reference(),
1698 round_11[1].reference(),
1699 round_11[2].reference(),
1700 round_11[5].reference(),
1701 ];
1702 expected_refs.sort(); assert_eq!(
1704 ancestors_refs, expected_refs,
1705 "Expected round 11 ancestors: {:?}. Got: {:?}",
1706 expected_refs, ancestors_refs
1707 );
1708 }
1709
1710 #[tokio::test]
1711 async fn test_link_causal_history() {
1712 let (mut context, _) = Context::new_for_test(4);
1713 context.parameters.dag_state_cached_rounds = 10;
1714 context.protocol_config.set_gc_depth_for_testing(3);
1715 let context = Arc::new(context);
1716
1717 let store = Arc::new(MemStore::new());
1718 let mut dag_state = DagState::new(context.clone(), store.clone());
1719
1720 let mut dag_builder = DagBuilder::new(context.clone());
1722 dag_builder.layers(1..=3).build();
1723 dag_builder
1724 .layers(4..=6)
1725 .authorities(vec![AuthorityIndex::new_for_test(0)])
1726 .skip_block()
1727 .build();
1728
1729 let all_blocks = dag_builder.all_blocks();
1731 dag_state.accept_blocks(all_blocks.clone());
1732
1733 for block in &all_blocks {
1735 assert!(!dag_state.has_been_included(&block.reference()));
1736 }
1737
1738 let round_1_block = &all_blocks[1];
1740 assert_eq!(round_1_block.round(), 1);
1741 let linked_blocks = dag_state.link_causal_history(round_1_block.reference());
1742
1743 assert_eq!(linked_blocks.len(), 1);
1745 assert_eq!(linked_blocks[0], round_1_block.reference());
1746 for block_ref in linked_blocks {
1747 assert!(dag_state.has_been_included(&block_ref));
1748 }
1749
1750 let round_2_block = &all_blocks[4];
1752 assert_eq!(round_2_block.round(), 2);
1753 let linked_blocks = dag_state.link_causal_history(round_2_block.reference());
1754
1755 assert_eq!(linked_blocks.len(), 4);
1757 for block_ref in linked_blocks {
1758 assert!(block_ref == round_2_block.reference() || block_ref.round == 1);
1759 }
1760
1761 for block in &all_blocks {
1763 if block.round() == 1 || block.reference() == round_2_block.reference() {
1764 assert!(dag_state.has_been_included(&block.reference()));
1765 } else {
1766 assert!(!dag_state.has_been_included(&block.reference()));
1767 }
1768 }
1769
1770 let round_6_block = all_blocks.last().unwrap();
1772 assert_eq!(round_6_block.round(), 6);
1773
1774 let last_commit = TrustedCommit::new_for_test(
1776 6,
1777 CommitDigest::MIN,
1778 context.clock.timestamp_utc_ms(),
1779 round_6_block.reference(),
1780 vec![],
1781 );
1782 dag_state.set_last_commit(last_commit);
1783 assert_eq!(
1784 dag_state.gc_round(),
1785 3,
1786 "GC round should have moved to round 3"
1787 );
1788
1789 let linked_blocks = dag_state.link_causal_history(round_6_block.reference());
1791
1792 assert_eq!(linked_blocks.len(), 7, "Linked blocks: {:?}", linked_blocks);
1794 for block_ref in linked_blocks {
1795 assert!(
1796 block_ref.round == 4
1797 || block_ref.round == 5
1798 || block_ref == round_6_block.reference()
1799 );
1800 }
1801
1802 for block in &all_blocks {
1804 let block_ref = block.reference();
1805 if block.round() == 1
1806 || block_ref == round_2_block.reference()
1807 || block_ref.round == 4
1808 || block_ref.round == 5
1809 || block_ref == round_6_block.reference()
1810 {
1811 assert!(dag_state.has_been_included(&block.reference()));
1812 } else {
1813 assert!(!dag_state.has_been_included(&block.reference()));
1814 }
1815 }
1816 }
1817
1818 #[tokio::test]
1819 async fn test_block_children_basics() {
1820 let (mut context, _) = Context::new_for_test(4);
1821 context.parameters.dag_state_cached_rounds = 2;
1824 context.protocol_config.set_gc_depth_for_testing(3);
1825 context.protocol_config.set_enable_v3_for_testing(true);
1826 let context = Arc::new(context);
1827
1828 let store = Arc::new(MemStore::new());
1829 let mut dag_state = DagState::new(context.clone(), store.clone());
1830
1831 let mut dag_builder = DagBuilder::new(context.clone());
1833 dag_builder.layers(1..=5).build();
1834
1835 let all_blocks = dag_builder.all_blocks();
1836 dag_state.accept_blocks(all_blocks.clone());
1837
1838 let mut expected_children: BTreeMap<BlockRef, BTreeSet<BlockRef>> = BTreeMap::new();
1841 for block in &all_blocks {
1842 expected_children
1843 .entry(block.reference())
1844 .or_default()
1845 .extend(
1846 all_blocks
1847 .iter()
1848 .filter(|b| b.round() == block.round() + 1)
1849 .map(|b| b.reference()),
1850 );
1851 }
1852
1853 for block in &all_blocks {
1855 let block_ref = block.reference();
1856 let actual: BTreeSet<BlockRef> = dag_state
1857 .get_block_children(&block_ref)
1858 .expect("accepted block should be in recent_blocks")
1859 .into_iter()
1860 .collect();
1861 let want = expected_children.get(&block_ref).cloned().unwrap();
1862 assert_eq!(actual, want, "mismatched children for {block_ref:?}");
1863
1864 let expected_authorities: BTreeSet<AuthorityIndex> =
1866 want.iter().map(|r| r.author).collect();
1867 let expected_stake: Stake = expected_authorities
1868 .iter()
1869 .map(|a| context.committee.stake(*a))
1870 .sum();
1871 assert_eq!(
1872 dag_state
1873 .get_block_children_authorities(&block_ref)
1874 .expect("accepted block should be in recent_blocks"),
1875 expected_authorities,
1876 "mismatched children_authorities for {block_ref:?}"
1877 );
1878 assert_eq!(
1879 dag_state
1880 .get_block_total_children_stake(&block_ref)
1881 .expect("accepted block should be in recent_blocks"),
1882 expected_stake,
1883 "mismatched total_children_stake for {block_ref:?}"
1884 );
1885 }
1886
1887 let round_2_block = all_blocks
1891 .iter()
1892 .find(|b| b.round() == 2)
1893 .expect("should have a round-2 block")
1894 .clone();
1895 let before: Vec<(BlockRef, BTreeSet<BlockRef>)> = round_2_block
1896 .ancestors()
1897 .iter()
1898 .map(|a| {
1899 (
1900 *a,
1901 dag_state
1902 .get_block_children(a)
1903 .unwrap()
1904 .into_iter()
1905 .collect(),
1906 )
1907 })
1908 .collect();
1909 dag_state.accept_block(round_2_block);
1910 for (ancestor, before_set) in before {
1911 let after: BTreeSet<BlockRef> = dag_state
1912 .get_block_children(&ancestor)
1913 .unwrap()
1914 .into_iter()
1915 .collect();
1916 assert_eq!(
1917 before_set, after,
1918 "children changed for {ancestor:?} after re-accept"
1919 );
1920 }
1921
1922 let round_5_leader = all_blocks
1926 .last()
1927 .expect("last block should be round 5")
1928 .reference();
1929 let last_commit = TrustedCommit::new_for_test(
1930 5,
1931 CommitDigest::MIN,
1932 context.clock.timestamp_utc_ms(),
1933 round_5_leader,
1934 vec![],
1935 );
1936 dag_state.set_last_commit(last_commit);
1937 assert_eq!(dag_state.gc_round(), 2);
1938
1939 for block in all_blocks.iter().filter(|block| block.round() <= 2) {
1942 let block_ref = block.reference();
1943 assert!(
1944 dag_state.get_block_children(&block_ref).is_none(),
1945 "below-gc block {block_ref:?} should hide children before flush"
1946 );
1947 assert!(
1948 dag_state
1949 .get_block_children_authorities(&block_ref)
1950 .is_none(),
1951 "below-gc block {block_ref:?} should hide child authorities before flush"
1952 );
1953 assert!(
1954 dag_state
1955 .get_block_total_children_stake(&block_ref)
1956 .is_none(),
1957 "below-gc block {block_ref:?} should hide child stake before flush"
1958 );
1959 }
1960
1961 dag_state.flush();
1962
1963 for block in &all_blocks {
1967 let block_ref = block.reference();
1968 match block.round() {
1969 1..=2 => assert!(
1970 dag_state.get_block_children(&block_ref).is_none(),
1971 "round {} block {block_ref:?} should be evicted after flush",
1972 block.round()
1973 ),
1974 3..=4 => {
1975 let actual: BTreeSet<BlockRef> = dag_state
1976 .get_block_children(&block_ref)
1977 .expect("above-gc block should remain")
1978 .into_iter()
1979 .collect();
1980 let want = expected_children
1981 .get(&block_ref)
1982 .cloned()
1983 .unwrap_or_default();
1984 assert_eq!(
1985 actual, want,
1986 "children changed for {block_ref:?} after flush"
1987 );
1988 let expected_authorities: BTreeSet<AuthorityIndex> =
1991 want.iter().map(|r| r.author).collect();
1992 let expected_stake: Stake = expected_authorities
1993 .iter()
1994 .map(|a| context.committee.stake(*a))
1995 .sum();
1996 assert_eq!(
1997 dag_state
1998 .get_block_children_authorities(&block_ref)
1999 .expect("above-gc block should remain"),
2000 expected_authorities,
2001 "children_authorities changed for {block_ref:?} after flush"
2002 );
2003 assert_eq!(
2004 dag_state
2005 .get_block_total_children_stake(&block_ref)
2006 .expect("above-gc block should remain"),
2007 expected_stake,
2008 "total_children_stake changed for {block_ref:?} after flush"
2009 );
2010 }
2011 5 => {
2012 let actual = dag_state
2013 .get_block_children(&block_ref)
2014 .expect("round-5 block should remain");
2015 assert!(
2016 actual.is_empty(),
2017 "round-5 block {block_ref:?} has unexpected children {actual:?}"
2018 );
2019 assert!(
2020 dag_state
2021 .get_block_children_authorities(&block_ref)
2022 .expect("round-5 block should remain")
2023 .is_empty(),
2024 "round-5 block {block_ref:?} must have no children_authorities"
2025 );
2026 assert_eq!(
2027 dag_state
2028 .get_block_total_children_stake(&block_ref)
2029 .expect("round-5 block should remain"),
2030 0,
2031 "round-5 block {block_ref:?} must have zero total_children_stake"
2032 );
2033 }
2034 _ => unreachable!(),
2035 }
2036 }
2037 }
2038
2039 #[tokio::test]
2040 async fn test_block_children_exclusion() {
2041 let (mut context, _) = Context::new_for_test(4);
2042 context.parameters.dag_state_cached_rounds = 10;
2043 context.protocol_config.set_gc_depth_for_testing(3);
2044 context.protocol_config.set_enable_v3_for_testing(true);
2045 let context = Arc::new(context);
2046
2047 let store = Arc::new(MemStore::new());
2048 let mut dag_state = DagState::new(context.clone(), store.clone());
2049
2050 let mut dag_builder = DagBuilder::new(context.clone());
2052 dag_builder.layers(1..=2).build();
2053 let base_blocks = dag_builder.all_blocks();
2054 dag_state.accept_blocks(base_blocks.clone());
2055
2056 let round_2_refs: Vec<BlockRef> = base_blocks
2060 .iter()
2061 .filter(|b| b.round() == 2 && b.author() != AuthorityIndex::new_for_test(0))
2062 .map(|b| b.reference())
2063 .collect();
2064 let weak_ancestor = base_blocks
2065 .iter()
2066 .find(|b| b.round() == 1 && b.author() == AuthorityIndex::new_for_test(0))
2067 .expect("should have a round-1 authority-0 block")
2068 .reference();
2069
2070 let mut ancestors = round_2_refs.clone();
2071 ancestors.push(weak_ancestor);
2072 let round_3 =
2073 VerifiedBlock::new_for_test(TestBlock::new(3, 1).set_ancestors_raw(ancestors).build());
2074 let round_3_ref = round_3.reference();
2075 dag_state.accept_block(round_3);
2076
2077 let author_1 = AuthorityIndex::new_for_test(1);
2081 let stake_1 = context.committee.stake(author_1);
2082 for r2_ref in &round_2_refs {
2083 let children = dag_state
2084 .get_block_children(r2_ref)
2085 .expect("round-2 block should still be present");
2086 assert!(
2087 children.contains(&round_3_ref),
2088 "round-2 parent {r2_ref:?} should have round-3 block as child",
2089 );
2090 let authorities = dag_state
2091 .get_block_children_authorities(r2_ref)
2092 .expect("round-2 block should still be present");
2093 assert_eq!(
2094 authorities,
2095 BTreeSet::from([author_1]),
2096 "round-2 parent {r2_ref:?} should have children_authorities == {{1}}",
2097 );
2098 assert_eq!(
2099 dag_state
2100 .get_block_total_children_stake(r2_ref)
2101 .expect("round-2 block should still be present"),
2102 stake_1,
2103 "round-2 parent {r2_ref:?} total_children_stake mismatch",
2104 );
2105 }
2106
2107 let weak_children = dag_state
2110 .get_block_children(&weak_ancestor)
2111 .expect("weak ancestor should still be present");
2112 assert!(
2113 !weak_children.contains(&round_3_ref),
2114 "weak ancestor {weak_ancestor:?} must NOT have round-3 block as child",
2115 );
2116 for c in &weak_children {
2117 assert_eq!(
2118 c.round, 2,
2119 "weak ancestor's children should all be round 2, got {c:?}"
2120 );
2121 }
2122 let weak_authorities = dag_state
2127 .get_block_children_authorities(&weak_ancestor)
2128 .expect("weak ancestor should still be present");
2129 let expected_weak_authorities: BTreeSet<AuthorityIndex> =
2130 (0..4).map(AuthorityIndex::new_for_test).collect();
2131 assert_eq!(
2132 weak_authorities, expected_weak_authorities,
2133 "weak ancestor {weak_ancestor:?} children_authorities should cover all 4 round-2 authors",
2134 );
2135 assert_eq!(
2136 dag_state
2137 .get_block_total_children_stake(&weak_ancestor)
2138 .expect("weak ancestor should still be present"),
2139 context.committee.total_stake(),
2140 "weak ancestor {weak_ancestor:?} total_children_stake should equal total committee stake",
2141 );
2142 }
2143
2144 #[tokio::test]
2145 async fn test_round_info() {
2146 let (mut context, _) = Context::new_for_test(4);
2147 context.parameters.dag_state_cached_rounds = 2;
2149 context.protocol_config.set_gc_depth_for_testing(3);
2150 context.protocol_config.set_enable_v3_for_testing(true);
2151 let context = Arc::new(context);
2152
2153 let store = Arc::new(MemStore::new());
2154 let mut dag_state = DagState::new(context.clone(), store.clone());
2155
2156 assert!(dag_state.round_info(1).is_none());
2158
2159 let mut dag_builder = DagBuilder::new(context.clone());
2162 dag_builder.layers(1..=4).build();
2163 dag_state.accept_blocks(dag_builder.all_blocks());
2164
2165 let all_authorities: BTreeSet<AuthorityIndex> =
2166 (0..4).map(AuthorityIndex::new_for_test).collect();
2167 for round in 1..=4 {
2168 let info = dag_state
2169 .round_info(round)
2170 .unwrap_or_else(|| panic!("round_info missing for round {round}"));
2171 assert_eq!(info.round, round);
2172 assert_eq!(
2173 info.authorities, all_authorities,
2174 "round {round} authorities mismatch"
2175 );
2176 assert_eq!(
2177 info.total_stake,
2178 context.committee.total_stake(),
2179 "round {round} total_stake mismatch"
2180 );
2181 }
2182
2183 assert!(dag_state.round_info(5).is_none());
2185
2186 dag_state.accept_blocks(dag_builder.all_blocks());
2189 for round in 1..=4 {
2190 let info = dag_state.round_info(round).unwrap();
2191 assert_eq!(
2192 info.authorities, all_authorities,
2193 "round {round} authorities changed after re-accept"
2194 );
2195 assert_eq!(
2196 info.total_stake,
2197 context.committee.total_stake(),
2198 "round {round} total_stake changed after re-accept"
2199 );
2200 }
2201
2202 let round_5_block = VerifiedBlock::new_for_test(
2204 TestBlock::new(5, 0)
2205 .set_ancestors_raw(
2206 dag_builder
2207 .all_blocks()
2208 .iter()
2209 .filter(|b| b.round() == 4)
2210 .map(|b| b.reference())
2211 .collect(),
2212 )
2213 .build(),
2214 );
2215 dag_state.accept_block(round_5_block);
2216 let info_5 = dag_state.round_info(5).expect("round 5 entry should exist");
2217 let author_0 = AuthorityIndex::new_for_test(0);
2218 assert_eq!(info_5.authorities, BTreeSet::from([author_0]));
2219 assert_eq!(info_5.total_stake, context.committee.stake(author_0));
2220
2221 let round_5_leader_ref = dag_state
2224 .recent_refs_by_authority
2225 .iter()
2226 .flat_map(|set| set.iter())
2227 .find(|r| r.round == 5)
2228 .copied()
2229 .expect("round 5 block should be accepted");
2230 let last_commit = TrustedCommit::new_for_test(
2231 5,
2232 CommitDigest::MIN,
2233 context.clock.timestamp_utc_ms(),
2234 round_5_leader_ref,
2235 vec![],
2236 );
2237 dag_state.set_last_commit(last_commit);
2238 assert_eq!(dag_state.gc_round(), 2);
2239
2240 for round in 1..=2 {
2243 assert!(
2244 dag_state.round_info(round).is_none(),
2245 "round {round} should be hidden before flush after GC advances"
2246 );
2247 }
2248
2249 dag_state.flush();
2250
2251 for round in 1..=2 {
2253 assert!(
2254 dag_state.round_info(round).is_none(),
2255 "round {round} should be evicted after flush"
2256 );
2257 }
2258 for round in 3..=4 {
2259 let info = dag_state
2260 .round_info(round)
2261 .unwrap_or_else(|| panic!("round_info missing for round {round} after flush"));
2262 assert_eq!(info.authorities, all_authorities);
2263 assert_eq!(info.total_stake, context.committee.total_stake());
2264 }
2265 let info_5 = dag_state
2266 .round_info(5)
2267 .expect("round 5 should still be present after flush");
2268 assert_eq!(info_5.authorities, BTreeSet::from([author_0]));
2269 assert_eq!(info_5.total_stake, context.committee.stake(author_0));
2270 }
2271
2272 #[tokio::test]
2273 async fn test_contains_blocks_in_cache_or_store() {
2274 const CACHED_ROUNDS: Round = 2;
2276
2277 let (mut context, _) = Context::new_for_test(4);
2278 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2279
2280 let context = Arc::new(context);
2281 let store = Arc::new(MemStore::new());
2282 let mut dag_state = DagState::new(context.clone(), store.clone());
2283
2284 let num_rounds: u32 = 10;
2286 let num_authorities: u32 = 4;
2287 let mut blocks = Vec::new();
2288
2289 for round in 1..=num_rounds {
2290 for author in 0..num_authorities {
2291 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2292 blocks.push(block);
2293 }
2294 }
2295
2296 blocks.clone().into_iter().for_each(|block| {
2298 if block.round() <= 4 {
2299 store
2300 .write(WriteBatch::default().blocks(vec![block]))
2301 .unwrap();
2302 } else {
2303 dag_state.accept_blocks(vec![block]);
2304 }
2305 });
2306
2307 let mut block_refs = blocks
2310 .iter()
2311 .map(|block| block.reference())
2312 .collect::<Vec<_>>();
2313 let result = dag_state.contains_blocks(block_refs.clone());
2314
2315 let mut expected = vec![true; (num_rounds * num_authorities) as usize];
2317 assert_eq!(result, expected);
2318
2319 block_refs.insert(
2321 3,
2322 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
2323 );
2324 let result = dag_state.contains_blocks(block_refs.clone());
2325
2326 expected.insert(3, false);
2328 assert_eq!(result, expected.clone());
2329 }
2330
2331 #[tokio::test]
2332 async fn test_contains_cached_block_at_slot() {
2333 const CACHED_ROUNDS: Round = 2;
2335
2336 let num_authorities: u32 = 4;
2337 let (mut context, _) = Context::new_for_test(num_authorities as usize);
2338 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2339
2340 let context = Arc::new(context);
2341 let store = Arc::new(MemStore::new());
2342 let mut dag_state = DagState::new(context.clone(), store.clone());
2343
2344 let num_rounds: u32 = 10;
2346 let mut blocks = Vec::new();
2347
2348 for round in 1..=num_rounds {
2349 for author in 0..num_authorities {
2350 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2351 blocks.push(block.clone());
2352 dag_state.accept_block(block);
2353 }
2354 }
2355
2356 for (author, _) in context.committee.authorities() {
2358 assert!(
2359 dag_state.contains_cached_block_at_slot(Slot::new(GENESIS_ROUND, author)),
2360 "Genesis should always be found"
2361 );
2362 }
2363
2364 let mut block_refs = blocks
2367 .iter()
2368 .map(|block| block.reference())
2369 .collect::<Vec<_>>();
2370
2371 for block_ref in block_refs.clone() {
2372 let slot = block_ref.into();
2373 let found = dag_state.contains_cached_block_at_slot(slot);
2374 assert!(found, "A block should be found at slot {}", slot);
2375 }
2376
2377 block_refs.insert(
2380 3,
2381 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
2382 );
2383 let mut expected = vec![true; (num_rounds * num_authorities) as usize];
2384 expected.insert(3, false);
2385
2386 for block_ref in block_refs {
2388 let slot = block_ref.into();
2389 let found = dag_state.contains_cached_block_at_slot(slot);
2390
2391 assert_eq!(expected.remove(0), found);
2392 }
2393 }
2394
2395 #[tokio::test]
2396 #[ignore]
2397 #[should_panic(
2398 expected = "Attempted to check for slot [1]3 that is <= the last gc evicted round 3"
2399 )]
2400 async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range() {
2401 const GC_DEPTH: u32 = 2;
2404 const CACHED_ROUNDS: Round = 3;
2406
2407 let (mut context, _) = Context::new_for_test(4);
2408 context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
2409 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2410
2411 let context = Arc::new(context);
2412 let store = Arc::new(MemStore::new());
2413 let mut dag_state = DagState::new(context.clone(), store.clone());
2414
2415 let mut dag_builder = DagBuilder::new(context.clone());
2417 dag_builder.layers(1..=3).build();
2418 dag_builder
2419 .layers(4..=6)
2420 .authorities(vec![AuthorityIndex::new_for_test(0)])
2421 .skip_block()
2422 .build();
2423
2424 dag_builder
2426 .all_blocks()
2427 .into_iter()
2428 .for_each(|block| dag_state.accept_block(block));
2429
2430 dag_state.add_commit(TrustedCommit::new_for_test(
2432 1 as CommitIndex,
2433 CommitDigest::MIN,
2434 0,
2435 dag_builder.leader_block(5).unwrap().reference(),
2436 vec![],
2437 ));
2438 dag_state.flush();
2440
2441 assert_eq!(dag_state.gc_round(), 3, "GC round should be 3");
2443
2444 for authority_index in 1..=3 {
2448 for round in 4..=6 {
2449 assert!(dag_state.contains_cached_block_at_slot(Slot::new(
2450 round,
2451 AuthorityIndex::new_for_test(authority_index)
2452 )));
2453 }
2454 }
2455
2456 for round in 1..=3 {
2457 assert!(
2458 dag_state.contains_cached_block_at_slot(Slot::new(
2459 round,
2460 AuthorityIndex::new_for_test(0)
2461 ))
2462 );
2463 }
2464
2465 let _ =
2468 dag_state.contains_cached_block_at_slot(Slot::new(3, AuthorityIndex::new_for_test(1)));
2469 }
2470
2471 #[tokio::test]
2472 async fn test_get_blocks_in_cache_or_store() {
2473 let (context, _) = Context::new_for_test(4);
2474 let context = Arc::new(context);
2475 let store = Arc::new(MemStore::new());
2476 let mut dag_state = DagState::new(context.clone(), store.clone());
2477
2478 let num_rounds: u32 = 10;
2480 let num_authorities: u32 = 4;
2481 let mut blocks = Vec::new();
2482
2483 for round in 1..=num_rounds {
2484 for author in 0..num_authorities {
2485 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2486 blocks.push(block);
2487 }
2488 }
2489
2490 blocks.clone().into_iter().for_each(|block| {
2492 if block.round() <= 4 {
2493 store
2494 .write(WriteBatch::default().blocks(vec![block]))
2495 .unwrap();
2496 } else {
2497 dag_state.accept_blocks(vec![block]);
2498 }
2499 });
2500
2501 let mut block_refs = blocks
2504 .iter()
2505 .map(|block| block.reference())
2506 .collect::<Vec<_>>();
2507 let result = dag_state.get_blocks(&block_refs);
2508
2509 let mut expected = blocks
2510 .into_iter()
2511 .map(Some)
2512 .collect::<Vec<Option<VerifiedBlock>>>();
2513
2514 assert_eq!(result, expected.clone());
2516
2517 block_refs.insert(
2519 3,
2520 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
2521 );
2522 let result = dag_state.get_blocks(&block_refs);
2523
2524 expected.insert(3, None);
2526 assert_eq!(result, expected);
2527 }
2528
2529 #[tokio::test]
2530 async fn test_flush_and_recovery() {
2531 telemetry_subscribers::init_for_testing();
2532
2533 const GC_DEPTH: u32 = 3;
2534 const CACHED_ROUNDS: u32 = 4;
2535
2536 let num_authorities: u32 = 4;
2537 let (mut context, _) = Context::new_for_test(num_authorities as usize);
2538 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2539 context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
2540
2541 let context = Arc::new(context);
2542
2543 let store = Arc::new(MemStore::new());
2544 let mut dag_state = DagState::new(context.clone(), store.clone());
2545
2546 const NUM_ROUNDS: Round = 20;
2547 let mut dag_builder = DagBuilder::new(context.clone());
2548 dag_builder.layers(1..=5).build();
2549 dag_builder
2550 .layers(6..=8)
2551 .authorities(vec![AuthorityIndex::new_for_test(0)])
2552 .skip_block()
2553 .build();
2554 dag_builder.layers(9..=NUM_ROUNDS).build();
2555
2556 const LAST_COMMIT_ROUND: Round = 16;
2558 const LAST_COMMIT_INDEX: CommitIndex = 15;
2559 let commits = dag_builder
2560 .get_sub_dag_and_commits(1..=NUM_ROUNDS)
2561 .into_iter()
2562 .map(|(_subdag, commit)| commit)
2563 .take(LAST_COMMIT_INDEX as usize)
2564 .collect::<Vec<_>>();
2565 assert_eq!(commits.len(), LAST_COMMIT_INDEX as usize);
2566 assert_eq!(commits.last().unwrap().round(), LAST_COMMIT_ROUND);
2567
2568 const PERSISTED_BLOCK_ROUNDS: u32 = 12;
2572 const NUM_PERSISTED_COMMITS: usize = 8;
2573 const LAST_PERSISTED_COMMIT_ROUND: Round = 9;
2574 const LAST_PERSISTED_COMMIT_INDEX: CommitIndex = 8;
2575 dag_state.accept_blocks(dag_builder.blocks(1..=PERSISTED_BLOCK_ROUNDS));
2576 let mut finalized_commits = vec![];
2577 for commit in commits.iter().take(NUM_PERSISTED_COMMITS).cloned() {
2578 finalized_commits.push(commit.clone());
2579 dag_state.add_commit(commit);
2580 }
2581 let last_finalized_commit = finalized_commits.last().unwrap();
2582 assert_eq!(last_finalized_commit.round(), LAST_PERSISTED_COMMIT_ROUND);
2583 assert_eq!(last_finalized_commit.index(), LAST_PERSISTED_COMMIT_INDEX);
2584
2585 let finalized_blocks = finalized_commits
2587 .iter()
2588 .flat_map(|commit| commit.blocks())
2589 .collect::<BTreeSet<_>>();
2590
2591 dag_state.flush();
2593
2594 let store_blocks = store
2596 .scan_blocks_by_author(AuthorityIndex::new_for_test(1), 1)
2597 .unwrap();
2598 assert_eq!(store_blocks.last().unwrap().round(), PERSISTED_BLOCK_ROUNDS);
2599 let store_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
2600 assert_eq!(store_commits.len(), NUM_PERSISTED_COMMITS);
2601 assert_eq!(
2602 store_commits.last().unwrap().index(),
2603 LAST_PERSISTED_COMMIT_INDEX
2604 );
2605 assert_eq!(
2606 store_commits.last().unwrap().round(),
2607 LAST_PERSISTED_COMMIT_ROUND
2608 );
2609
2610 dag_state.accept_blocks(dag_builder.blocks(PERSISTED_BLOCK_ROUNDS + 1..=NUM_ROUNDS));
2612 for commit in commits.iter().skip(NUM_PERSISTED_COMMITS).cloned() {
2613 dag_state.add_commit(commit);
2614 }
2615
2616 let all_blocks = dag_builder.blocks(1..=NUM_ROUNDS);
2618 let block_refs = all_blocks
2619 .iter()
2620 .map(|block| block.reference())
2621 .collect::<Vec<_>>();
2622 let result = dag_state
2623 .get_blocks(&block_refs)
2624 .into_iter()
2625 .map(|b| b.unwrap())
2626 .collect::<Vec<_>>();
2627 assert_eq!(result, all_blocks);
2628
2629 assert_eq!(dag_state.last_commit_index(), LAST_COMMIT_INDEX);
2631
2632 drop(dag_state);
2634
2635 let dag_state = DagState::new(context.clone(), store.clone());
2637
2638 let all_blocks = dag_builder.blocks(1..=PERSISTED_BLOCK_ROUNDS);
2640 let block_refs = all_blocks
2641 .iter()
2642 .map(|block| block.reference())
2643 .collect::<Vec<_>>();
2644 let result = dag_state
2645 .get_blocks(&block_refs)
2646 .into_iter()
2647 .map(|b| b.unwrap())
2648 .collect::<Vec<_>>();
2649 assert_eq!(result, all_blocks);
2650
2651 let missing_blocks = dag_builder.blocks(PERSISTED_BLOCK_ROUNDS + 1..=NUM_ROUNDS);
2653 let block_refs = missing_blocks
2654 .iter()
2655 .map(|block| block.reference())
2656 .collect::<Vec<_>>();
2657 let retrieved_blocks = dag_state
2658 .get_blocks(&block_refs)
2659 .into_iter()
2660 .flatten()
2661 .collect::<Vec<_>>();
2662 assert!(retrieved_blocks.is_empty());
2663
2664 assert_eq!(dag_state.last_commit_index(), LAST_PERSISTED_COMMIT_INDEX);
2666 assert_eq!(dag_state.last_commit_round(), LAST_PERSISTED_COMMIT_ROUND);
2667
2668 let expected_last_committed_rounds = vec![5, 9, 8, 8];
2670 assert_eq!(
2671 dag_state.last_committed_rounds(),
2672 expected_last_committed_rounds
2673 );
2674 assert_eq!(dag_state.scoring_subdags_count(), NUM_PERSISTED_COMMITS);
2676
2677 for (authority_index, _) in context.committee.authorities() {
2679 let blocks = dag_state.get_cached_blocks(authority_index, 1);
2680
2681 if authority_index == AuthorityIndex::new_for_test(0) {
2685 assert_eq!(blocks.len(), 4);
2686 assert_eq!(dag_state.evicted_rounds[authority_index.value()], 6);
2687 assert!(
2688 blocks
2689 .into_iter()
2690 .all(|block| block.round() >= 7 && block.round() <= 12)
2691 );
2692 } else {
2693 assert_eq!(blocks.len(), 6);
2694 assert_eq!(dag_state.evicted_rounds[authority_index.value()], 6);
2695 assert!(
2696 blocks
2697 .into_iter()
2698 .all(|block| block.round() >= 7 && block.round() <= 12)
2699 );
2700 }
2701 }
2702
2703 let gc_round = dag_state.gc_round();
2705 assert_eq!(gc_round, 6);
2706 dag_state
2707 .recent_blocks
2708 .iter()
2709 .for_each(|(block_ref, block_info)| {
2710 if block_ref.round > gc_round && finalized_blocks.contains(block_ref) {
2711 assert!(
2712 block_info.committed,
2713 "Block {:?} should be set as committed",
2714 block_ref
2715 );
2716 }
2717 });
2718
2719 dag_state
2724 .recent_blocks
2725 .iter()
2726 .for_each(|(block_ref, block_info)| {
2727 if block_ref.round < PERSISTED_BLOCK_ROUNDS || block_ref.author.value() == 0 {
2728 assert!(block_info.included);
2729 } else {
2730 assert!(!block_info.included);
2731 }
2732 });
2733 }
2734
2735 #[tokio::test]
2736 async fn test_block_info_as_committed() {
2737 let num_authorities: u32 = 4;
2738 let (context, _) = Context::new_for_test(num_authorities as usize);
2739 let context = Arc::new(context);
2740
2741 let store = Arc::new(MemStore::new());
2742 let mut dag_state = DagState::new(context.clone(), store.clone());
2743
2744 let block = VerifiedBlock::new_for_test(
2746 TestBlock::new(1, 0)
2747 .set_timestamp_ms(1000)
2748 .set_ancestors(vec![])
2749 .build(),
2750 );
2751
2752 dag_state.accept_block(block.clone());
2753
2754 assert!(!dag_state.is_committed(&block.reference()));
2756
2757 assert!(
2759 dag_state.set_committed(&block.reference()),
2760 "Block should be successfully set as committed for first time"
2761 );
2762
2763 assert!(dag_state.is_committed(&block.reference()));
2765
2766 assert!(
2768 !dag_state.set_committed(&block.reference()),
2769 "Block should not be successfully set as committed"
2770 );
2771 }
2772
2773 #[tokio::test]
2774 async fn test_get_cached_blocks() {
2775 let (mut context, _) = Context::new_for_test(4);
2776 context.parameters.dag_state_cached_rounds = 5;
2777
2778 let context = Arc::new(context);
2779 let store = Arc::new(MemStore::new());
2780 let mut dag_state = DagState::new(context.clone(), store.clone());
2781
2782 let mut all_blocks = Vec::new();
2787 for author in 1..=3 {
2788 for round in 10..(10 + author) {
2789 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2790 all_blocks.push(block.clone());
2791 dag_state.accept_block(block);
2792 }
2793 }
2794
2795 let cached_blocks =
2798 dag_state.get_cached_blocks(context.committee.to_authority_index(0).unwrap(), 0);
2799 assert!(cached_blocks.is_empty());
2800
2801 let cached_blocks =
2802 dag_state.get_cached_blocks(context.committee.to_authority_index(1).unwrap(), 10);
2803 assert_eq!(cached_blocks.len(), 1);
2804 assert_eq!(cached_blocks[0].round(), 10);
2805
2806 let cached_blocks =
2807 dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 10);
2808 assert_eq!(cached_blocks.len(), 2);
2809 assert_eq!(cached_blocks[0].round(), 10);
2810 assert_eq!(cached_blocks[1].round(), 11);
2811
2812 let cached_blocks =
2813 dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 11);
2814 assert_eq!(cached_blocks.len(), 1);
2815 assert_eq!(cached_blocks[0].round(), 11);
2816
2817 let cached_blocks =
2818 dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 10);
2819 assert_eq!(cached_blocks.len(), 3);
2820 assert_eq!(cached_blocks[0].round(), 10);
2821 assert_eq!(cached_blocks[1].round(), 11);
2822 assert_eq!(cached_blocks[2].round(), 12);
2823
2824 let cached_blocks =
2825 dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 12);
2826 assert_eq!(cached_blocks.len(), 1);
2827 assert_eq!(cached_blocks[0].round(), 12);
2828
2829 let cached_blocks = dag_state.get_cached_blocks_in_range(
2833 context.committee.to_authority_index(3).unwrap(),
2834 10,
2835 10,
2836 1,
2837 );
2838 assert!(cached_blocks.is_empty());
2839
2840 let cached_blocks = dag_state.get_cached_blocks_in_range(
2842 context.committee.to_authority_index(3).unwrap(),
2843 11,
2844 10,
2845 1,
2846 );
2847 assert!(cached_blocks.is_empty());
2848
2849 let cached_blocks = dag_state.get_cached_blocks_in_range(
2851 context.committee.to_authority_index(0).unwrap(),
2852 9,
2853 10,
2854 1,
2855 );
2856 assert!(cached_blocks.is_empty());
2857
2858 let cached_blocks = dag_state.get_cached_blocks_in_range(
2860 context.committee.to_authority_index(1).unwrap(),
2861 9,
2862 11,
2863 1,
2864 );
2865 assert_eq!(cached_blocks.len(), 1);
2866 assert_eq!(cached_blocks[0].round(), 10);
2867
2868 let cached_blocks = dag_state.get_cached_blocks_in_range(
2870 context.committee.to_authority_index(2).unwrap(),
2871 9,
2872 12,
2873 5,
2874 );
2875 assert_eq!(cached_blocks.len(), 2);
2876 assert_eq!(cached_blocks[0].round(), 10);
2877 assert_eq!(cached_blocks[1].round(), 11);
2878
2879 let cached_blocks = dag_state.get_cached_blocks_in_range(
2881 context.committee.to_authority_index(3).unwrap(),
2882 11,
2883 20,
2884 5,
2885 );
2886 assert_eq!(cached_blocks.len(), 2);
2887 assert_eq!(cached_blocks[0].round(), 11);
2888 assert_eq!(cached_blocks[1].round(), 12);
2889
2890 let cached_blocks = dag_state.get_cached_blocks_in_range(
2892 context.committee.to_authority_index(3).unwrap(),
2893 10,
2894 20,
2895 1,
2896 );
2897 assert_eq!(cached_blocks.len(), 1);
2898 assert_eq!(cached_blocks[0].round(), 10);
2899 }
2900
2901 #[tokio::test]
2902 async fn test_get_last_cached_block() {
2903 const CACHED_ROUNDS: Round = 2;
2905 const GC_DEPTH: u32 = 1;
2906 let (mut context, _) = Context::new_for_test(4);
2907 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2908 context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
2909
2910 let context = Arc::new(context);
2911 let store = Arc::new(MemStore::new());
2912 let mut dag_state = DagState::new(context.clone(), store.clone());
2913
2914 let dag_str = "DAG {
2919 Round 0 : { 4 },
2920 Round 1 : {
2921 B -> [*],
2922 C -> [*],
2923 D -> [*],
2924 },
2925 Round 2 : {
2926 C -> [*],
2927 D -> [*],
2928 },
2929 Round 3 : {
2930 D -> [*],
2931 },
2932 }";
2933
2934 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2935
2936 let block = VerifiedBlock::new_for_test(TestBlock::new(2, 2).build());
2938
2939 for block in dag_builder
2941 .all_blocks()
2942 .into_iter()
2943 .chain(std::iter::once(block))
2944 {
2945 dag_state.accept_block(block);
2946 }
2947
2948 dag_state.add_commit(TrustedCommit::new_for_test(
2949 1 as CommitIndex,
2950 CommitDigest::MIN,
2951 context.clock.timestamp_utc_ms(),
2952 dag_builder.leader_block(3).unwrap().reference(),
2953 vec![],
2954 ));
2955
2956 let end_round = 4;
2958 let expected_rounds = vec![0, 1, 2, 3];
2959 let expected_excluded_and_equivocating_blocks = vec![0, 0, 1, 0];
2960 let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2962 assert_eq!(
2963 last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2964 expected_rounds
2965 );
2966 assert_eq!(
2967 last_blocks.iter().map(|b| b.1.len()).collect::<Vec<_>>(),
2968 expected_excluded_and_equivocating_blocks
2969 );
2970
2971 for (i, expected_round) in expected_rounds.iter().enumerate() {
2973 let round = dag_state
2974 .get_last_cached_block_in_range(
2975 context.committee.to_authority_index(i).unwrap(),
2976 0,
2977 end_round,
2978 )
2979 .map(|b| b.round())
2980 .unwrap_or_default();
2981 assert_eq!(round, *expected_round, "Authority {i}");
2982 }
2983
2984 let start_round = 2;
2986 let expected_rounds = [0, 0, 2, 3];
2987
2988 for (i, expected_round) in expected_rounds.iter().enumerate() {
2990 let round = dag_state
2991 .get_last_cached_block_in_range(
2992 context.committee.to_authority_index(i).unwrap(),
2993 start_round,
2994 end_round,
2995 )
2996 .map(|b| b.round())
2997 .unwrap_or_default();
2998 assert_eq!(round, *expected_round, "Authority {i}");
2999 }
3000
3001 dag_state.flush();
3007
3008 let end_round = 3;
3010 let expected_rounds = vec![0, 1, 2, 2];
3011
3012 let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
3014 assert_eq!(
3015 last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
3016 expected_rounds
3017 );
3018
3019 for (i, expected_round) in expected_rounds.iter().enumerate() {
3021 let round = dag_state
3022 .get_last_cached_block_in_range(
3023 context.committee.to_authority_index(i).unwrap(),
3024 0,
3025 end_round,
3026 )
3027 .map(|b| b.round())
3028 .unwrap_or_default();
3029 assert_eq!(round, *expected_round, "Authority {i}");
3030 }
3031 }
3032
3033 #[tokio::test]
3034 #[should_panic(
3035 expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
3036 )]
3037 async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() {
3038 const CACHED_ROUNDS: Round = 1;
3040 const GC_DEPTH: u32 = 1;
3041 let (mut context, _) = Context::new_for_test(4);
3042 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
3043 context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
3044
3045 let context = Arc::new(context);
3046 let store = Arc::new(MemStore::new());
3047 let mut dag_state = DagState::new(context.clone(), store.clone());
3048
3049 let mut dag_builder = DagBuilder::new(context.clone());
3054 dag_builder
3055 .layers(1..=1)
3056 .authorities(vec![AuthorityIndex::new_for_test(0)])
3057 .skip_block()
3058 .build();
3059 dag_builder
3060 .layers(2..=2)
3061 .authorities(vec![
3062 AuthorityIndex::new_for_test(0),
3063 AuthorityIndex::new_for_test(1),
3064 ])
3065 .skip_block()
3066 .build();
3067 dag_builder
3068 .layers(3..=3)
3069 .authorities(vec![
3070 AuthorityIndex::new_for_test(0),
3071 AuthorityIndex::new_for_test(1),
3072 AuthorityIndex::new_for_test(2),
3073 ])
3074 .skip_block()
3075 .build();
3076
3077 for block in dag_builder.all_blocks() {
3079 dag_state.accept_block(block);
3080 }
3081
3082 dag_state.add_commit(TrustedCommit::new_for_test(
3083 1 as CommitIndex,
3084 CommitDigest::MIN,
3085 0,
3086 dag_builder.leader_block(3).unwrap().reference(),
3087 vec![],
3088 ));
3089
3090 dag_state.flush();
3092
3093 dag_state.get_last_cached_block_per_authority(2);
3095 }
3096
3097 #[tokio::test]
3098 async fn test_last_quorum() {
3099 let (context, _) = Context::new_for_test(4);
3101 let context = Arc::new(context);
3102 let store = Arc::new(MemStore::new());
3103 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3104
3105 {
3107 let genesis = genesis_blocks(context.as_ref());
3108
3109 assert_eq!(dag_state.read().last_quorum(), genesis);
3110 }
3111
3112 {
3114 let mut dag_builder = DagBuilder::new(context.clone());
3115 dag_builder
3116 .layers(1..=4)
3117 .build()
3118 .persist_layers(dag_state.clone());
3119 let round_4_blocks: Vec<_> = dag_builder
3120 .blocks(4..=4)
3121 .into_iter()
3122 .map(|block| block.reference())
3123 .collect();
3124
3125 let last_quorum = dag_state.read().last_quorum();
3126
3127 assert_eq!(
3128 last_quorum
3129 .into_iter()
3130 .map(|block| block.reference())
3131 .collect::<Vec<_>>(),
3132 round_4_blocks
3133 );
3134 }
3135
3136 {
3138 let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
3139 dag_state.write().accept_block(block);
3140
3141 let round_4_blocks = dag_state.read().get_uncommitted_blocks_at_round(4);
3142
3143 let last_quorum = dag_state.read().last_quorum();
3144
3145 assert_eq!(last_quorum, round_4_blocks);
3146 }
3147 }
3148
3149 #[tokio::test]
3150 async fn test_last_block_for_authority() {
3151 let (context, _) = Context::new_for_test(4);
3153 let context = Arc::new(context);
3154 let store = Arc::new(MemStore::new());
3155 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3156
3157 {
3159 let genesis = genesis_blocks(context.as_ref());
3160 let my_genesis = genesis
3161 .into_iter()
3162 .find(|block| block.author() == context.own_index)
3163 .unwrap();
3164
3165 assert_eq!(dag_state.read().get_last_proposed_block(), Some(my_genesis));
3166 }
3167
3168 {
3170 let mut dag_builder = DagBuilder::new(context.clone());
3172 dag_builder
3173 .layers(1..=4)
3174 .build()
3175 .persist_layers(dag_state.clone());
3176
3177 let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
3179 dag_state.write().accept_block(block);
3180
3181 let block = dag_state
3182 .read()
3183 .get_last_block_for_authority(AuthorityIndex::new_for_test(0));
3184 assert_eq!(block.round(), 5);
3185
3186 for (authority_index, _) in context.committee.authorities() {
3187 let block = dag_state
3188 .read()
3189 .get_last_block_for_authority(authority_index);
3190
3191 if authority_index.value() == 0 {
3192 assert_eq!(block.round(), 5);
3193 } else {
3194 assert_eq!(block.round(), 4);
3195 }
3196 }
3197 }
3198 }
3199
3200 #[tokio::test]
3201 async fn test_accept_block_not_panics_when_timestamp_is_ahead_and_median_timestamp() {
3202 let (context, _) = Context::new_for_test(4);
3204 let context = Arc::new(context);
3205 let store = Arc::new(MemStore::new());
3206 let mut dag_state = DagState::new(context.clone(), store.clone());
3207
3208 let block_timestamp = context.clock.timestamp_utc_ms() + 5_000;
3210
3211 let block = VerifiedBlock::new_for_test(
3212 TestBlock::new(10, 0)
3213 .set_timestamp_ms(block_timestamp)
3214 .build(),
3215 );
3216
3217 dag_state.accept_block(block);
3219 }
3220
3221 #[tokio::test]
3222 async fn test_last_finalized_commit() {
3223 let (context, _) = Context::new_for_test(4);
3225 let context = Arc::new(context);
3226 let store = Arc::new(MemStore::new());
3227 let mut dag_state = DagState::new(context.clone(), store.clone());
3228
3229 let commit_ref = CommitRef::new(1, CommitDigest::MIN);
3231 let rejected_transactions = BTreeMap::new();
3232 dag_state.add_finalized_commit(commit_ref, rejected_transactions.clone());
3233
3234 assert_eq!(dag_state.finalized_commits_to_write.len(), 1);
3236 assert_eq!(
3237 dag_state.finalized_commits_to_write[0],
3238 (commit_ref, rejected_transactions.clone())
3239 );
3240
3241 dag_state.flush();
3243
3244 let last_finalized_commit = store.read_last_finalized_commit().unwrap();
3246 assert_eq!(last_finalized_commit, Some(commit_ref));
3247 let stored_rejected_transactions = store
3248 .read_rejected_transactions(commit_ref)
3249 .unwrap()
3250 .unwrap();
3251 assert_eq!(stored_rejected_transactions, rejected_transactions);
3252 }
3253}