1use std::{
5 cmp::max,
6 collections::{BTreeMap, BTreeSet, VecDeque},
7 ops::Bound::{Excluded, Included, Unbounded},
8 panic,
9 sync::Arc,
10 time::Duration,
11 vec,
12};
13
14use consensus_config::AuthorityIndex;
15use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs, Round, TransactionIndex};
16use itertools::Itertools as _;
17use tokio::time::Instant;
18use tracing::{debug, error, info, trace};
19
20use crate::{
21 CommittedSubDag,
22 block::{BlockAPI, GENESIS_ROUND, Slot, VerifiedBlock, genesis_blocks},
23 commit::{
24 CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRef, CommitVote,
25 GENESIS_COMMIT_INDEX, TrustedCommit, load_committed_subdag_from_store,
26 },
27 context::Context,
28 leader_scoring::{ReputationScores, ScoringSubdag},
29 storage::{Store, WriteBatch},
30 threshold_clock::ThresholdClock,
31};
32
33pub struct DagState {
41 context: Arc<Context>,
42
43 genesis: BTreeMap<BlockRef, VerifiedBlock>,
45
46 recent_blocks: BTreeMap<BlockRef, BlockInfo>,
54
55 recent_refs_by_authority: Vec<BTreeSet<BlockRef>>,
58
59 threshold_clock: ThresholdClock,
61
62 evicted_rounds: Vec<Round>,
66
67 highest_accepted_round: Round,
69
70 last_commit: Option<TrustedCommit>,
72
73 last_commit_round_advancement_time: Option<std::time::Instant>,
75
76 last_committed_rounds: Vec<Round>,
78
79 scoring_subdag: ScoringSubdag,
82
83 pending_commit_votes: VecDeque<CommitVote>,
87
88 blocks_to_write: Vec<VerifiedBlock>,
91 commits_to_write: Vec<TrustedCommit>,
92
93 commit_info_to_write: Vec<(CommitRef, CommitInfo)>,
97
98 finalized_commits_to_write: Vec<(CommitRef, BTreeMap<BlockRef, Vec<TransactionIndex>>)>,
100
101 store: Arc<dyn Store>,
103
104 cached_rounds: Round,
106}
107
108impl DagState {
109 pub fn new(context: Arc<Context>, store: Arc<dyn Store>) -> Self {
111 let cached_rounds = context.parameters.dag_state_cached_rounds as Round;
112 let num_authorities = context.committee.size();
113
114 let genesis = genesis_blocks(context.as_ref())
115 .into_iter()
116 .map(|block| (block.reference(), block))
117 .collect();
118
119 let threshold_clock = ThresholdClock::new(1, context.clone());
120
121 let last_commit = store
122 .read_last_commit()
123 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
124
125 let commit_info = store
126 .read_last_commit_info()
127 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
128 let (mut last_committed_rounds, commit_recovery_start_index) =
129 if let Some((commit_ref, commit_info)) = commit_info {
130 tracing::info!("Recovering committed state from {commit_ref} {commit_info:?}");
131 (commit_info.committed_rounds, commit_ref.index + 1)
132 } else {
133 tracing::info!("Found no stored CommitInfo to recover from");
134 (vec![0; num_authorities], GENESIS_COMMIT_INDEX + 1)
135 };
136
137 let mut unscored_committed_subdags = Vec::new();
138 let mut scoring_subdag = ScoringSubdag::new(context.clone());
139
140 if let Some(last_commit) = last_commit.as_ref() {
141 store
142 .scan_commits((commit_recovery_start_index..=last_commit.index()).into())
143 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
144 .iter()
145 .for_each(|commit| {
146 for block_ref in commit.blocks() {
147 last_committed_rounds[block_ref.author] =
148 max(last_committed_rounds[block_ref.author], block_ref.round);
149 }
150 let committed_subdag = load_committed_subdag_from_store(
151 &context,
152 store.as_ref(),
153 commit.clone(),
154 vec![],
155 );
156 unscored_committed_subdags.push(committed_subdag);
158 });
159 }
160
161 tracing::info!(
162 "DagState was initialized with the following state: \
163 {last_commit:?}; {last_committed_rounds:?}; {} unscored committed subdags;",
164 unscored_committed_subdags.len()
165 );
166
167 scoring_subdag.add_subdags(std::mem::take(&mut unscored_committed_subdags));
168
169 let mut state = Self {
170 context: context.clone(),
171 genesis,
172 recent_blocks: BTreeMap::new(),
173 recent_refs_by_authority: vec![BTreeSet::new(); num_authorities],
174 threshold_clock,
175 highest_accepted_round: 0,
176 last_commit: last_commit.clone(),
177 last_commit_round_advancement_time: None,
178 last_committed_rounds: last_committed_rounds.clone(),
179 pending_commit_votes: VecDeque::new(),
180 blocks_to_write: vec![],
181 commits_to_write: vec![],
182 commit_info_to_write: vec![],
183 finalized_commits_to_write: vec![],
184 scoring_subdag,
185 store: store.clone(),
186 cached_rounds,
187 evicted_rounds: vec![0; num_authorities],
188 };
189
190 for (authority_index, _) in context.committee.authorities() {
191 let (blocks, eviction_round) = {
192 let last_block = state
195 .store
196 .scan_last_blocks_by_author(authority_index, 1, None)
197 .expect("Database error");
198 let last_block_round = last_block
199 .last()
200 .map(|b| b.round())
201 .unwrap_or(GENESIS_ROUND);
202
203 let eviction_round =
204 Self::eviction_round(last_block_round, state.gc_round(), state.cached_rounds);
205 let blocks = state
206 .store
207 .scan_blocks_by_author(authority_index, eviction_round + 1)
208 .expect("Database error");
209
210 (blocks, eviction_round)
211 };
212
213 state.evicted_rounds[authority_index] = eviction_round;
214
215 for block in &blocks {
217 state.update_block_metadata(block);
218 }
219
220 debug!(
221 "Recovered blocks {}: {:?}",
222 authority_index,
223 blocks
224 .iter()
225 .map(|b| b.reference())
226 .collect::<Vec<BlockRef>>()
227 );
228 }
229
230 if let Some(last_commit) = last_commit {
231 let mut index = last_commit.index();
232 let gc_round = state.gc_round();
233 info!(
234 "Recovering block commit statuses from commit index {} and backwards until leader of round <= gc_round {:?}",
235 index, gc_round
236 );
237
238 loop {
239 let commits = store
240 .scan_commits((index..=index).into())
241 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
242 let Some(commit) = commits.first() else {
243 info!("Recovering finished up to index {index}, no more commits to recover");
244 break;
245 };
246
247 if gc_round > 0 && commit.leader().round <= gc_round {
249 info!(
250 "Recovering finished, reached commit leader round {} <= gc_round {}",
251 commit.leader().round,
252 gc_round
253 );
254 break;
255 }
256
257 commit.blocks().iter().filter(|b| b.round > gc_round).for_each(|block_ref|{
258 debug!(
259 "Setting block {:?} as committed based on commit {:?}",
260 block_ref,
261 commit.index()
262 );
263 assert!(state.set_committed(block_ref), "Attempted to set again a block {:?} as committed when recovering commit {:?}", block_ref, commit);
264 });
265
266 index = index.saturating_sub(1);
268 if index == 0 {
269 break;
270 }
271 }
272 }
273
274 let proposed_blocks = store
276 .scan_blocks_by_author(context.own_index, state.gc_round() + 1)
277 .expect("Database error");
278 for block in proposed_blocks {
279 state.link_causal_history(block.reference());
280 }
281
282 state
283 }
284
285 pub(crate) fn accept_block(&mut self, block: VerifiedBlock) {
287 assert_ne!(
288 block.round(),
289 0,
290 "Genesis block should not be accepted into DAG."
291 );
292
293 let block_ref = block.reference();
294 if self.contains_block(&block_ref) {
295 return;
296 }
297
298 let now = self.context.clock.timestamp_utc_ms();
299 if block.timestamp_ms() > now {
300 trace!(
301 "Block {:?} with timestamp {} is greater than local timestamp {}.",
302 block,
303 block.timestamp_ms(),
304 now,
305 );
306 }
307 let hostname = &self.context.committee.authority(block_ref.author).hostname;
308 self.context
309 .metrics
310 .node_metrics
311 .accepted_block_time_drift_ms
312 .with_label_values(&[hostname])
313 .inc_by(block.timestamp_ms().saturating_sub(now));
314
315 if block_ref.author == self.context.own_index {
318 let existing_blocks = self.get_uncommitted_blocks_at_slot(block_ref.into());
319 if !self
320 .context
321 .parameters
322 .internal
323 .skip_equivocation_validation
324 {
325 assert!(
326 existing_blocks.is_empty(),
327 "Block Rejected! Attempted to add block {block:#?} to own slot where \
328 block(s) {existing_blocks:#?} already exists."
329 );
330 }
331 }
332 self.update_block_metadata(&block);
333 self.blocks_to_write.push(block);
334 let source = if self.context.own_index == block_ref.author {
335 "own"
336 } else {
337 "others"
338 };
339 self.context
340 .metrics
341 .node_metrics
342 .accepted_blocks
343 .with_label_values(&[source])
344 .inc();
345 }
346
347 fn update_block_metadata(&mut self, block: &VerifiedBlock) {
349 let block_ref = block.reference();
350 self.recent_blocks
351 .insert(block_ref, BlockInfo::new(block.clone()));
352 self.recent_refs_by_authority[block_ref.author].insert(block_ref);
353
354 if self.threshold_clock.add_block(block_ref) {
355 let last_proposed_block = self.get_last_proposed_block();
357 if last_proposed_block.round() == block_ref.round {
358 let quorum_delay_ms = self
359 .context
360 .clock
361 .timestamp_utc_ms()
362 .saturating_sub(self.get_last_proposed_block().timestamp_ms());
363 self.context
364 .metrics
365 .node_metrics
366 .quorum_receive_latency
367 .observe(Duration::from_millis(quorum_delay_ms).as_secs_f64());
368 }
369 }
370
371 self.highest_accepted_round = max(self.highest_accepted_round, block.round());
372 self.context
373 .metrics
374 .node_metrics
375 .highest_accepted_round
376 .set(self.highest_accepted_round as i64);
377
378 let highest_accepted_round_for_author = self.recent_refs_by_authority[block_ref.author]
379 .last()
380 .map(|block_ref| block_ref.round)
381 .expect("There should be by now at least one block ref");
382 let hostname = &self.context.committee.authority(block_ref.author).hostname;
383 self.context
384 .metrics
385 .node_metrics
386 .highest_accepted_authority_round
387 .with_label_values(&[hostname])
388 .set(highest_accepted_round_for_author as i64);
389 }
390
391 pub(crate) fn accept_blocks(&mut self, blocks: Vec<VerifiedBlock>) {
393 debug!(
394 "Accepting blocks: {}",
395 blocks.iter().map(|b| b.reference().to_string()).join(",")
396 );
397 for block in blocks {
398 self.accept_block(block);
399 }
400 }
401
402 pub(crate) fn get_block(&self, reference: &BlockRef) -> Option<VerifiedBlock> {
405 self.get_blocks(&[*reference])
406 .pop()
407 .expect("Exactly one element should be returned")
408 }
409
410 pub(crate) fn get_blocks(&self, block_refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
413 let mut blocks = vec![None; block_refs.len()];
414 let mut missing = Vec::new();
415
416 for (index, block_ref) in block_refs.iter().enumerate() {
417 if block_ref.round == GENESIS_ROUND {
418 if let Some(block) = self.genesis.get(block_ref) {
420 blocks[index] = Some(block.clone());
421 }
422 continue;
423 }
424 if let Some(block_info) = self.recent_blocks.get(block_ref) {
425 blocks[index] = Some(block_info.block.clone());
426 continue;
427 }
428 missing.push((index, block_ref));
429 }
430
431 if missing.is_empty() {
432 return blocks;
433 }
434
435 let missing_refs = missing
436 .iter()
437 .map(|(_, block_ref)| **block_ref)
438 .collect::<Vec<_>>();
439 let store_results = self
440 .store
441 .read_blocks(&missing_refs)
442 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
443 self.context
444 .metrics
445 .node_metrics
446 .dag_state_store_read_count
447 .with_label_values(&["get_blocks"])
448 .inc();
449
450 for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
451 blocks[index] = result;
452 }
453
454 blocks
455 }
456
457 pub(crate) fn get_uncommitted_blocks_at_slot(&self, slot: Slot) -> Vec<VerifiedBlock> {
460 let mut blocks = vec![];
464 for (_block_ref, block_info) in self.recent_blocks.range((
465 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
466 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
467 )) {
468 blocks.push(block_info.block.clone())
469 }
470 blocks
471 }
472
473 pub(crate) fn get_uncommitted_blocks_at_round(&self, round: Round) -> Vec<VerifiedBlock> {
476 if round <= self.last_commit_round() {
477 panic!("Round {} have committed blocks!", round);
478 }
479
480 let mut blocks = vec![];
481 for (_block_ref, block_info) in self.recent_blocks.range((
482 Included(BlockRef::new(round, AuthorityIndex::ZERO, BlockDigest::MIN)),
483 Excluded(BlockRef::new(
484 round + 1,
485 AuthorityIndex::ZERO,
486 BlockDigest::MIN,
487 )),
488 )) {
489 blocks.push(block_info.block.clone())
490 }
491 blocks
492 }
493
494 pub(crate) fn ancestors_at_round(
496 &self,
497 later_block: &VerifiedBlock,
498 earlier_round: Round,
499 ) -> Vec<VerifiedBlock> {
500 let mut linked: BTreeSet<BlockRef> = later_block.ancestors().iter().cloned().collect();
502 while !linked.is_empty() {
503 let round = linked.last().unwrap().round;
504 if round <= earlier_round {
506 break;
507 }
508 let block_ref = linked.pop_last().unwrap();
509 let Some(block) = self.get_block(&block_ref) else {
510 panic!("Block {:?} should exist in DAG!", block_ref);
511 };
512 linked.extend(block.ancestors().iter().cloned());
513 }
514 linked
515 .range((
516 Included(BlockRef::new(
517 earlier_round,
518 AuthorityIndex::ZERO,
519 BlockDigest::MIN,
520 )),
521 Unbounded,
522 ))
523 .map(|r| {
524 self.get_block(r)
525 .unwrap_or_else(|| panic!("Block {:?} should exist in DAG!", r))
526 .clone()
527 })
528 .collect()
529 }
530
531 pub(crate) fn get_last_proposed_block(&self) -> VerifiedBlock {
534 self.get_last_block_for_authority(self.context.own_index)
535 }
536
537 pub(crate) fn get_last_block_for_authority(&self, authority: AuthorityIndex) -> VerifiedBlock {
540 if let Some(last) = self.recent_refs_by_authority[authority].last() {
541 return self
542 .recent_blocks
543 .get(last)
544 .expect("Block should be found in recent blocks")
545 .block
546 .clone();
547 }
548
549 let (_, genesis_block) = self
551 .genesis
552 .iter()
553 .find(|(block_ref, _)| block_ref.author == authority)
554 .expect("Genesis should be found for authority {authority_index}");
555 genesis_block.clone()
556 }
557
558 pub(crate) fn get_cached_blocks(
564 &self,
565 authority: AuthorityIndex,
566 start: Round,
567 ) -> Vec<VerifiedBlock> {
568 self.get_cached_blocks_in_range(authority, start, Round::MAX, usize::MAX)
569 }
570
571 pub(crate) fn get_cached_blocks_in_range(
574 &self,
575 authority: AuthorityIndex,
576 start_round: Round,
577 end_round: Round,
578 limit: usize,
579 ) -> Vec<VerifiedBlock> {
580 if start_round >= end_round || limit == 0 {
581 return vec![];
582 }
583
584 let mut blocks = vec![];
585 for block_ref in self.recent_refs_by_authority[authority].range((
586 Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
587 Excluded(BlockRef::new(
588 end_round,
589 AuthorityIndex::MIN,
590 BlockDigest::MIN,
591 )),
592 )) {
593 let block_info = self
594 .recent_blocks
595 .get(block_ref)
596 .expect("Block should exist in recent blocks");
597 blocks.push(block_info.block.clone());
598 if blocks.len() >= limit {
599 break;
600 }
601 }
602 blocks
603 }
604
605 pub(crate) fn get_last_cached_block_in_range(
607 &self,
608 authority: AuthorityIndex,
609 start_round: Round,
610 end_round: Round,
611 ) -> Option<VerifiedBlock> {
612 if start_round >= end_round {
613 return None;
614 }
615
616 let block_ref = self.recent_refs_by_authority[authority]
617 .range((
618 Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
619 Excluded(BlockRef::new(
620 end_round,
621 AuthorityIndex::MIN,
622 BlockDigest::MIN,
623 )),
624 ))
625 .last()?;
626
627 self.recent_blocks
628 .get(block_ref)
629 .map(|block_info| block_info.block.clone())
630 }
631
632 pub(crate) fn get_last_cached_block_per_authority(
639 &self,
640 end_round: Round,
641 ) -> Vec<(VerifiedBlock, Vec<BlockRef>)> {
642 let mut blocks = self.genesis.values().cloned().collect::<Vec<_>>();
644 let mut equivocating_blocks = vec![vec![]; self.context.committee.size()];
645
646 if end_round == GENESIS_ROUND {
647 panic!(
648 "Attempted to retrieve blocks earlier than the genesis round which is not possible"
649 );
650 }
651
652 if end_round == GENESIS_ROUND + 1 {
653 return blocks.into_iter().map(|b| (b, vec![])).collect();
654 }
655
656 for (authority_index, block_refs) in self.recent_refs_by_authority.iter().enumerate() {
657 let authority_index = self
658 .context
659 .committee
660 .to_authority_index(authority_index)
661 .unwrap();
662
663 let last_evicted_round = self.evicted_rounds[authority_index];
664 if end_round.saturating_sub(1) <= last_evicted_round {
665 panic!(
666 "Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}",
667 );
668 }
669
670 let block_ref_iter = block_refs
671 .range((
672 Included(BlockRef::new(
673 last_evicted_round + 1,
674 authority_index,
675 BlockDigest::MIN,
676 )),
677 Excluded(BlockRef::new(end_round, authority_index, BlockDigest::MIN)),
678 ))
679 .rev();
680
681 let mut last_round = 0;
682 for block_ref in block_ref_iter {
683 if last_round == 0 {
684 last_round = block_ref.round;
685 let block_info = self
686 .recent_blocks
687 .get(block_ref)
688 .expect("Block should exist in recent blocks");
689 blocks[authority_index] = block_info.block.clone();
690 continue;
691 }
692 if block_ref.round < last_round {
693 break;
694 }
695 equivocating_blocks[authority_index].push(*block_ref);
696 }
697 }
698
699 blocks.into_iter().zip(equivocating_blocks).collect()
700 }
701
702 pub(crate) fn contains_cached_block_at_slot(&self, slot: Slot) -> bool {
705 if slot.round == GENESIS_ROUND {
707 return true;
708 }
709
710 let eviction_round = self.evicted_rounds[slot.authority];
711 if slot.round <= eviction_round {
712 panic!(
713 "{}",
714 format!(
715 "Attempted to check for slot {slot} that is <= the last evicted round {eviction_round}"
716 )
717 );
718 }
719
720 let mut result = self.recent_refs_by_authority[slot.authority].range((
721 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
722 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
723 ));
724 result.next().is_some()
725 }
726
727 pub(crate) fn contains_blocks(&self, block_refs: Vec<BlockRef>) -> Vec<bool> {
730 let mut exist = vec![false; block_refs.len()];
731 let mut missing = Vec::new();
732
733 for (index, block_ref) in block_refs.into_iter().enumerate() {
734 let recent_refs = &self.recent_refs_by_authority[block_ref.author];
735 if recent_refs.contains(&block_ref) || self.genesis.contains_key(&block_ref) {
736 exist[index] = true;
737 } else if recent_refs.is_empty() || recent_refs.last().unwrap().round < block_ref.round
738 {
739 exist[index] = false;
743 } else {
744 missing.push((index, block_ref));
745 }
746 }
747
748 if missing.is_empty() {
749 return exist;
750 }
751
752 let missing_refs = missing
753 .iter()
754 .map(|(_, block_ref)| *block_ref)
755 .collect::<Vec<_>>();
756 let store_results = self
757 .store
758 .contains_blocks(&missing_refs)
759 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
760 self.context
761 .metrics
762 .node_metrics
763 .dag_state_store_read_count
764 .with_label_values(&["contains_blocks"])
765 .inc();
766
767 for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
768 exist[index] = result;
769 }
770
771 exist
772 }
773
774 pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool {
775 let blocks = self.contains_blocks(vec![*block_ref]);
776 blocks.first().cloned().unwrap()
777 }
778
779 pub(crate) fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
782 if let Some(block_info) = self.recent_blocks.get_mut(block_ref) {
783 if !block_info.committed {
784 block_info.committed = true;
785 return true;
786 }
787 false
788 } else {
789 panic!(
790 "Block {:?} not found in cache to set as committed.",
791 block_ref
792 );
793 }
794 }
795
796 pub(crate) fn is_committed(&self, block_ref: &BlockRef) -> bool {
798 self.recent_blocks
799 .get(block_ref)
800 .unwrap_or_else(|| panic!("Attempted to query for commit status for a block not in cached data {block_ref}"))
801 .committed
802 }
803
804 pub(crate) fn link_causal_history(&mut self, root_block: BlockRef) -> Vec<BlockRef> {
810 let gc_round = self.gc_round();
811 let mut linked_blocks = vec![];
812 let mut targets = VecDeque::new();
813 targets.push_back(root_block);
814 while let Some(block_ref) = targets.pop_front() {
815 if block_ref.round <= gc_round {
823 continue;
824 }
825 let block_info = self
826 .recent_blocks
827 .get_mut(&block_ref)
828 .unwrap_or_else(|| panic!("Block {:?} is not in DAG state", block_ref));
829 if block_info.included {
830 continue;
831 }
832 linked_blocks.push(block_ref);
833 block_info.included = true;
834 targets.extend(block_info.block.ancestors().iter());
835 }
836 linked_blocks
837 }
838
839 pub(crate) fn has_been_included(&self, block_ref: &BlockRef) -> bool {
842 self.recent_blocks
843 .get(block_ref)
844 .unwrap_or_else(|| {
845 panic!(
846 "Attempted to query for inclusion status for a block not in cached data {}",
847 block_ref
848 )
849 })
850 .included
851 }
852
853 pub(crate) fn threshold_clock_round(&self) -> Round {
854 self.threshold_clock.get_round()
855 }
856
857 pub(crate) fn threshold_clock_quorum_ts(&self) -> Instant {
859 self.threshold_clock.get_quorum_ts()
860 }
861
862 pub(crate) fn highest_accepted_round(&self) -> Round {
863 self.highest_accepted_round
864 }
865
866 pub(crate) fn add_commit(&mut self, commit: TrustedCommit) {
869 let time_diff = if let Some(last_commit) = &self.last_commit {
870 if commit.index() <= last_commit.index() {
871 error!(
872 "New commit index {} <= last commit index {}!",
873 commit.index(),
874 last_commit.index()
875 );
876 return;
877 }
878 assert_eq!(commit.index(), last_commit.index() + 1);
879
880 if commit.timestamp_ms() < last_commit.timestamp_ms() {
881 panic!(
882 "Commit timestamps do not monotonically increment, prev commit {:?}, new commit {:?}",
883 last_commit, commit
884 );
885 }
886 commit
887 .timestamp_ms()
888 .saturating_sub(last_commit.timestamp_ms())
889 } else {
890 assert_eq!(commit.index(), 1);
891 0
892 };
893
894 self.context
895 .metrics
896 .node_metrics
897 .last_commit_time_diff
898 .observe(time_diff as f64);
899
900 let commit_round_advanced = if let Some(previous_commit) = &self.last_commit {
901 previous_commit.round() < commit.round()
902 } else {
903 true
904 };
905
906 self.last_commit = Some(commit.clone());
907
908 if commit_round_advanced {
909 let now = std::time::Instant::now();
910 if let Some(previous_time) = self.last_commit_round_advancement_time {
911 self.context
912 .metrics
913 .node_metrics
914 .commit_round_advancement_interval
915 .observe(now.duration_since(previous_time).as_secs_f64())
916 }
917 self.last_commit_round_advancement_time = Some(now);
918 }
919
920 for block_ref in commit.blocks().iter() {
921 self.last_committed_rounds[block_ref.author] = max(
922 self.last_committed_rounds[block_ref.author],
923 block_ref.round,
924 );
925 }
926
927 for (i, round) in self.last_committed_rounds.iter().enumerate() {
928 let index = self.context.committee.to_authority_index(i).unwrap();
929 let hostname = &self.context.committee.authority(index).hostname;
930 self.context
931 .metrics
932 .node_metrics
933 .last_committed_authority_round
934 .with_label_values(&[hostname])
935 .set((*round).into());
936 }
937
938 self.pending_commit_votes.push_back(commit.reference());
939 self.commits_to_write.push(commit);
940 }
941
942 pub(crate) fn recover_commits_to_write(&mut self, commits: Vec<TrustedCommit>) {
944 self.commits_to_write.extend(commits);
945 }
946
947 pub(crate) fn ensure_commits_to_write_is_empty(&self) {
948 assert!(
949 self.commits_to_write.is_empty(),
950 "Commits to write should be empty. {:?}",
951 self.commits_to_write,
952 );
953 }
954
955 pub(crate) fn add_commit_info(&mut self, reputation_scores: ReputationScores) {
956 assert!(self.scoring_subdag.is_empty());
960
961 let commit_info = CommitInfo {
962 committed_rounds: self.last_committed_rounds.clone(),
963 reputation_scores,
964 };
965 let last_commit = self
966 .last_commit
967 .as_ref()
968 .expect("Last commit should already be set.");
969 self.commit_info_to_write
970 .push((last_commit.reference(), commit_info));
971 }
972
973 pub(crate) fn add_finalized_commit(
974 &mut self,
975 commit_ref: CommitRef,
976 rejected_transactions: BTreeMap<BlockRef, Vec<TransactionIndex>>,
977 ) {
978 self.finalized_commits_to_write
979 .push((commit_ref, rejected_transactions));
980 }
981
982 pub(crate) fn take_commit_votes(&mut self, limit: usize) -> Vec<CommitVote> {
983 let mut votes = Vec::new();
984 while !self.pending_commit_votes.is_empty() && votes.len() < limit {
985 votes.push(self.pending_commit_votes.pop_front().unwrap());
986 }
987 votes
988 }
989
990 pub(crate) fn last_commit_index(&self) -> CommitIndex {
992 match &self.last_commit {
993 Some(commit) => commit.index(),
994 None => 0,
995 }
996 }
997
998 pub(crate) fn last_commit_digest(&self) -> CommitDigest {
1000 match &self.last_commit {
1001 Some(commit) => commit.digest(),
1002 None => CommitDigest::MIN,
1003 }
1004 }
1005
1006 pub(crate) fn last_commit_timestamp_ms(&self) -> BlockTimestampMs {
1008 match &self.last_commit {
1009 Some(commit) => commit.timestamp_ms(),
1010 None => 0,
1011 }
1012 }
1013
1014 pub(crate) fn last_commit_leader(&self) -> Slot {
1016 match &self.last_commit {
1017 Some(commit) => commit.leader().into(),
1018 None => self
1019 .genesis
1020 .iter()
1021 .next()
1022 .map(|(genesis_ref, _)| *genesis_ref)
1023 .expect("Genesis blocks should always be available.")
1024 .into(),
1025 }
1026 }
1027
1028 pub(crate) fn last_commit_round(&self) -> Round {
1030 match &self.last_commit {
1031 Some(commit) => commit.leader().round,
1032 None => 0,
1033 }
1034 }
1035
1036 pub(crate) fn last_committed_rounds(&self) -> Vec<Round> {
1038 self.last_committed_rounds.clone()
1039 }
1040
1041 pub(crate) fn gc_round(&self) -> Round {
1045 self.calculate_gc_round(self.last_commit_round())
1046 }
1047
1048 pub(crate) fn calculate_gc_round(&self, commit_round: Round) -> Round {
1051 commit_round.saturating_sub(self.context.protocol_config.gc_depth())
1052 }
1053
1054 pub(crate) fn flush(&mut self) {
1064 let _s = self
1065 .context
1066 .metrics
1067 .node_metrics
1068 .scope_processing_time
1069 .with_label_values(&["DagState::flush"])
1070 .start_timer();
1071
1072 let pending_blocks = std::mem::take(&mut self.blocks_to_write);
1074 let pending_commits = std::mem::take(&mut self.commits_to_write);
1075 let pending_commit_info = std::mem::take(&mut self.commit_info_to_write);
1076 let pending_finalized_commits = std::mem::take(&mut self.finalized_commits_to_write);
1077 if pending_blocks.is_empty()
1078 && pending_commits.is_empty()
1079 && pending_commit_info.is_empty()
1080 && pending_finalized_commits.is_empty()
1081 {
1082 return;
1083 }
1084
1085 debug!(
1086 "Flushing {} blocks ({}), {} commits ({}), {} commit infos ({}), {} finalized commits ({}) to storage.",
1087 pending_blocks.len(),
1088 pending_blocks
1089 .iter()
1090 .map(|b| b.reference().to_string())
1091 .join(","),
1092 pending_commits.len(),
1093 pending_commits
1094 .iter()
1095 .map(|c| c.reference().to_string())
1096 .join(","),
1097 pending_commit_info.len(),
1098 pending_commit_info
1099 .iter()
1100 .map(|(commit_ref, _)| commit_ref.to_string())
1101 .join(","),
1102 pending_finalized_commits.len(),
1103 pending_finalized_commits
1104 .iter()
1105 .map(|(commit_ref, _)| commit_ref.to_string())
1106 .join(","),
1107 );
1108 self.store
1109 .write(WriteBatch::new(
1110 pending_blocks,
1111 pending_commits,
1112 pending_commit_info,
1113 pending_finalized_commits,
1114 ))
1115 .unwrap_or_else(|e| panic!("Failed to write to storage: {:?}", e));
1116 self.context
1117 .metrics
1118 .node_metrics
1119 .dag_state_store_write_count
1120 .inc();
1121
1122 for (authority_index, _) in self.context.committee.authorities() {
1124 let eviction_round = self.calculate_authority_eviction_round(authority_index);
1125 while let Some(block_ref) = self.recent_refs_by_authority[authority_index].first() {
1126 if block_ref.round <= eviction_round {
1127 self.recent_blocks.remove(block_ref);
1128 self.recent_refs_by_authority[authority_index].pop_first();
1129 } else {
1130 break;
1131 }
1132 }
1133 self.evicted_rounds[authority_index] = eviction_round;
1134 }
1135
1136 let metrics = &self.context.metrics.node_metrics;
1137 metrics
1138 .dag_state_recent_blocks
1139 .set(self.recent_blocks.len() as i64);
1140 metrics.dag_state_recent_refs.set(
1141 self.recent_refs_by_authority
1142 .iter()
1143 .map(BTreeSet::len)
1144 .sum::<usize>() as i64,
1145 );
1146 }
1147
1148 pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> {
1149 self.store
1150 .read_last_commit_info()
1151 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
1152 }
1153
1154 pub(crate) fn add_scoring_subdags(&mut self, scoring_subdags: Vec<CommittedSubDag>) {
1155 self.scoring_subdag.add_subdags(scoring_subdags);
1156 }
1157
1158 pub(crate) fn clear_scoring_subdag(&mut self) {
1159 self.scoring_subdag.clear();
1160 }
1161
1162 pub(crate) fn scoring_subdags_count(&self) -> usize {
1163 self.scoring_subdag.scored_subdags_count()
1164 }
1165
1166 pub(crate) fn is_scoring_subdag_empty(&self) -> bool {
1167 self.scoring_subdag.is_empty()
1168 }
1169
1170 pub(crate) fn calculate_scoring_subdag_scores(&self) -> ReputationScores {
1171 self.scoring_subdag.calculate_distributed_vote_scores()
1172 }
1173
1174 pub(crate) fn scoring_subdag_commit_range(&self) -> CommitIndex {
1175 self.scoring_subdag
1176 .commit_range
1177 .as_ref()
1178 .expect("commit range should exist for scoring subdag")
1179 .end()
1180 }
1181
1182 fn calculate_authority_eviction_round(&self, authority_index: AuthorityIndex) -> Round {
1186 let last_round = self.recent_refs_by_authority[authority_index]
1187 .last()
1188 .map(|block_ref| block_ref.round)
1189 .unwrap_or(GENESIS_ROUND);
1190
1191 Self::eviction_round(last_round, self.gc_round(), self.cached_rounds)
1192 }
1193
1194 fn eviction_round(last_round: Round, gc_round: Round, cached_rounds: u32) -> Round {
1197 gc_round.min(last_round.saturating_sub(cached_rounds))
1198 }
1199
1200 pub(crate) fn store(&self) -> Arc<dyn Store> {
1202 self.store.clone()
1203 }
1204
1205 #[cfg(test)]
1208 pub(crate) fn last_quorum(&self) -> Vec<VerifiedBlock> {
1209 for round in
1212 (self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev()
1213 {
1214 if round == GENESIS_ROUND {
1215 return self.genesis_blocks();
1216 }
1217 use crate::stake_aggregator::{QuorumThreshold, StakeAggregator};
1218 let mut quorum = StakeAggregator::<QuorumThreshold>::new();
1219
1220 let blocks = self.get_uncommitted_blocks_at_round(round);
1222 for block in &blocks {
1223 if quorum.add(block.author(), &self.context.committee) {
1224 return blocks;
1225 }
1226 }
1227 }
1228
1229 panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds.");
1230 }
1231
1232 #[cfg(test)]
1233 pub(crate) fn genesis_blocks(&self) -> Vec<VerifiedBlock> {
1234 self.genesis.values().cloned().collect()
1235 }
1236
1237 #[cfg(test)]
1238 pub(crate) fn set_last_commit(&mut self, commit: TrustedCommit) {
1239 self.last_commit = Some(commit);
1240 }
1241}
1242
1243struct BlockInfo {
1244 block: VerifiedBlock,
1245 committed: bool,
1247 included: bool,
1254}
1255
1256impl BlockInfo {
1257 fn new(block: VerifiedBlock) -> Self {
1258 Self {
1259 block,
1260 committed: false,
1261 included: false,
1262 }
1263 }
1264}
1265
1266#[cfg(test)]
1267mod test {
1268 use std::vec;
1269
1270 use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs};
1271 use parking_lot::RwLock;
1272
1273 use super::*;
1274 use crate::{
1275 block::{TestBlock, VerifiedBlock},
1276 storage::{WriteBatch, mem_store::MemStore},
1277 test_dag_builder::DagBuilder,
1278 test_dag_parser::parse_dag,
1279 };
1280
1281 #[tokio::test]
1282 async fn test_get_blocks() {
1283 let (context, _) = Context::new_for_test(4);
1284 let context = Arc::new(context);
1285 let store = Arc::new(MemStore::new());
1286 let mut dag_state = DagState::new(context.clone(), store.clone());
1287 let own_index = AuthorityIndex::new_for_test(0);
1288
1289 let num_rounds: u32 = 10;
1291 let non_existent_round: u32 = 100;
1292 let num_authorities: u32 = 3;
1293 let num_blocks_per_slot: usize = 3;
1294 let mut blocks = BTreeMap::new();
1295 for round in 1..=num_rounds {
1296 for author in 0..num_authorities {
1297 let base_ts = round as BlockTimestampMs * 1000;
1299 for timestamp in base_ts..base_ts + num_blocks_per_slot as u64 {
1300 let block = VerifiedBlock::new_for_test(
1301 TestBlock::new(round, author)
1302 .set_timestamp_ms(timestamp)
1303 .build(),
1304 );
1305 dag_state.accept_block(block.clone());
1306 blocks.insert(block.reference(), block);
1307
1308 if AuthorityIndex::new_for_test(author) == own_index {
1310 break;
1311 }
1312 }
1313 }
1314 }
1315
1316 for (r, block) in &blocks {
1318 assert_eq!(&dag_state.get_block(r).unwrap(), block);
1319 }
1320
1321 let last_ref = blocks.keys().last().unwrap();
1323 assert!(
1324 dag_state
1325 .get_block(&BlockRef::new(
1326 last_ref.round,
1327 last_ref.author,
1328 BlockDigest::MIN
1329 ))
1330 .is_none()
1331 );
1332
1333 for round in 1..=num_rounds {
1335 for author in 0..num_authorities {
1336 let slot = Slot::new(
1337 round,
1338 context
1339 .committee
1340 .to_authority_index(author as usize)
1341 .unwrap(),
1342 );
1343 let blocks = dag_state.get_uncommitted_blocks_at_slot(slot);
1344
1345 if AuthorityIndex::new_for_test(author) == own_index {
1347 assert_eq!(blocks.len(), 1);
1348 } else {
1349 assert_eq!(blocks.len(), num_blocks_per_slot);
1350 }
1351
1352 for b in blocks {
1353 assert_eq!(b.round(), round);
1354 assert_eq!(
1355 b.author(),
1356 context
1357 .committee
1358 .to_authority_index(author as usize)
1359 .unwrap()
1360 );
1361 }
1362 }
1363 }
1364
1365 let slot = Slot::new(non_existent_round, AuthorityIndex::ZERO);
1367 assert!(dag_state.get_uncommitted_blocks_at_slot(slot).is_empty());
1368
1369 for round in 1..=num_rounds {
1371 let blocks = dag_state.get_uncommitted_blocks_at_round(round);
1372 assert_eq!(
1375 blocks.len(),
1376 (num_authorities - 1) as usize * num_blocks_per_slot + 1
1377 );
1378 for b in blocks {
1379 assert_eq!(b.round(), round);
1380 }
1381 }
1382
1383 assert!(
1385 dag_state
1386 .get_uncommitted_blocks_at_round(non_existent_round)
1387 .is_empty()
1388 );
1389 }
1390
1391 #[tokio::test]
1392 async fn test_ancestors_at_uncommitted_round() {
1393 let (context, _) = Context::new_for_test(4);
1395 let context = Arc::new(context);
1396 let store = Arc::new(MemStore::new());
1397 let mut dag_state = DagState::new(context.clone(), store.clone());
1398
1399 let round_10_refs: Vec<_> = (0..4)
1403 .map(|a| {
1404 VerifiedBlock::new_for_test(TestBlock::new(10, a).set_timestamp_ms(1000).build())
1405 .reference()
1406 })
1407 .collect();
1408
1409 let round_11 = [
1411 VerifiedBlock::new_for_test(
1413 TestBlock::new(11, 0)
1414 .set_timestamp_ms(1100)
1415 .set_ancestors(round_10_refs.clone())
1416 .build(),
1417 ),
1418 VerifiedBlock::new_for_test(
1421 TestBlock::new(11, 1)
1422 .set_timestamp_ms(1110)
1423 .set_ancestors(round_10_refs.clone())
1424 .build(),
1425 ),
1426 VerifiedBlock::new_for_test(
1428 TestBlock::new(11, 1)
1429 .set_timestamp_ms(1111)
1430 .set_ancestors(round_10_refs.clone())
1431 .build(),
1432 ),
1433 VerifiedBlock::new_for_test(
1435 TestBlock::new(11, 1)
1436 .set_timestamp_ms(1112)
1437 .set_ancestors(round_10_refs.clone())
1438 .build(),
1439 ),
1440 VerifiedBlock::new_for_test(
1442 TestBlock::new(11, 2)
1443 .set_timestamp_ms(1120)
1444 .set_ancestors(round_10_refs.clone())
1445 .build(),
1446 ),
1447 VerifiedBlock::new_for_test(
1449 TestBlock::new(11, 3)
1450 .set_timestamp_ms(1130)
1451 .set_ancestors(round_10_refs.clone())
1452 .build(),
1453 ),
1454 ];
1455
1456 let ancestors_for_round_12 = vec![
1458 round_11[0].reference(),
1459 round_11[1].reference(),
1460 round_11[5].reference(),
1461 ];
1462 let round_12 = [
1463 VerifiedBlock::new_for_test(
1464 TestBlock::new(12, 0)
1465 .set_timestamp_ms(1200)
1466 .set_ancestors(ancestors_for_round_12.clone())
1467 .build(),
1468 ),
1469 VerifiedBlock::new_for_test(
1470 TestBlock::new(12, 2)
1471 .set_timestamp_ms(1220)
1472 .set_ancestors(ancestors_for_round_12.clone())
1473 .build(),
1474 ),
1475 VerifiedBlock::new_for_test(
1476 TestBlock::new(12, 3)
1477 .set_timestamp_ms(1230)
1478 .set_ancestors(ancestors_for_round_12.clone())
1479 .build(),
1480 ),
1481 ];
1482
1483 let ancestors_for_round_13 = vec![
1485 round_12[0].reference(),
1486 round_12[1].reference(),
1487 round_12[2].reference(),
1488 round_11[2].reference(),
1489 ];
1490 let round_13 = [
1491 VerifiedBlock::new_for_test(
1492 TestBlock::new(12, 1)
1493 .set_timestamp_ms(1300)
1494 .set_ancestors(ancestors_for_round_13.clone())
1495 .build(),
1496 ),
1497 VerifiedBlock::new_for_test(
1498 TestBlock::new(12, 2)
1499 .set_timestamp_ms(1320)
1500 .set_ancestors(ancestors_for_round_13.clone())
1501 .build(),
1502 ),
1503 VerifiedBlock::new_for_test(
1504 TestBlock::new(12, 3)
1505 .set_timestamp_ms(1330)
1506 .set_ancestors(ancestors_for_round_13.clone())
1507 .build(),
1508 ),
1509 ];
1510
1511 let ancestors_for_round_14 = round_13.iter().map(|b| b.reference()).collect();
1513 let anchor = VerifiedBlock::new_for_test(
1514 TestBlock::new(14, 1)
1515 .set_timestamp_ms(1410)
1516 .set_ancestors(ancestors_for_round_14)
1517 .build(),
1518 );
1519
1520 for b in round_11
1522 .iter()
1523 .chain(round_12.iter())
1524 .chain(round_13.iter())
1525 .chain([anchor.clone()].iter())
1526 {
1527 dag_state.accept_block(b.clone());
1528 }
1529
1530 let ancestors = dag_state.ancestors_at_round(&anchor, 11);
1532 let mut ancestors_refs: Vec<BlockRef> = ancestors.iter().map(|b| b.reference()).collect();
1533 ancestors_refs.sort();
1534 let mut expected_refs = vec![
1535 round_11[0].reference(),
1536 round_11[1].reference(),
1537 round_11[2].reference(),
1538 round_11[5].reference(),
1539 ];
1540 expected_refs.sort(); assert_eq!(
1542 ancestors_refs, expected_refs,
1543 "Expected round 11 ancestors: {:?}. Got: {:?}",
1544 expected_refs, ancestors_refs
1545 );
1546 }
1547
1548 #[tokio::test]
1549 async fn test_link_causal_history() {
1550 let (mut context, _) = Context::new_for_test(4);
1551 context.parameters.dag_state_cached_rounds = 10;
1552 context.protocol_config.set_gc_depth_for_testing(3);
1553 let context = Arc::new(context);
1554
1555 let store = Arc::new(MemStore::new());
1556 let mut dag_state = DagState::new(context.clone(), store.clone());
1557
1558 let mut dag_builder = DagBuilder::new(context.clone());
1560 dag_builder.layers(1..=3).build();
1561 dag_builder
1562 .layers(4..=6)
1563 .authorities(vec![AuthorityIndex::new_for_test(0)])
1564 .skip_block()
1565 .build();
1566
1567 let all_blocks = dag_builder.all_blocks();
1569 dag_state.accept_blocks(all_blocks.clone());
1570
1571 for block in &all_blocks {
1573 assert!(!dag_state.has_been_included(&block.reference()));
1574 }
1575
1576 let round_1_block = &all_blocks[1];
1578 assert_eq!(round_1_block.round(), 1);
1579 let linked_blocks = dag_state.link_causal_history(round_1_block.reference());
1580
1581 assert_eq!(linked_blocks.len(), 1);
1583 assert_eq!(linked_blocks[0], round_1_block.reference());
1584 for block_ref in linked_blocks {
1585 assert!(dag_state.has_been_included(&block_ref));
1586 }
1587
1588 let round_2_block = &all_blocks[4];
1590 assert_eq!(round_2_block.round(), 2);
1591 let linked_blocks = dag_state.link_causal_history(round_2_block.reference());
1592
1593 assert_eq!(linked_blocks.len(), 4);
1595 for block_ref in linked_blocks {
1596 assert!(block_ref == round_2_block.reference() || block_ref.round == 1);
1597 }
1598
1599 for block in &all_blocks {
1601 if block.round() == 1 || block.reference() == round_2_block.reference() {
1602 assert!(dag_state.has_been_included(&block.reference()));
1603 } else {
1604 assert!(!dag_state.has_been_included(&block.reference()));
1605 }
1606 }
1607
1608 let round_6_block = all_blocks.last().unwrap();
1610 assert_eq!(round_6_block.round(), 6);
1611
1612 let last_commit = TrustedCommit::new_for_test(
1614 6,
1615 CommitDigest::MIN,
1616 context.clock.timestamp_utc_ms(),
1617 round_6_block.reference(),
1618 vec![],
1619 );
1620 dag_state.set_last_commit(last_commit);
1621 assert_eq!(
1622 dag_state.gc_round(),
1623 3,
1624 "GC round should have moved to round 3"
1625 );
1626
1627 let linked_blocks = dag_state.link_causal_history(round_6_block.reference());
1629
1630 assert_eq!(linked_blocks.len(), 7, "Linked blocks: {:?}", linked_blocks);
1632 for block_ref in linked_blocks {
1633 assert!(
1634 block_ref.round == 4
1635 || block_ref.round == 5
1636 || block_ref == round_6_block.reference()
1637 );
1638 }
1639
1640 for block in &all_blocks {
1642 let block_ref = block.reference();
1643 if block.round() == 1
1644 || block_ref == round_2_block.reference()
1645 || block_ref.round == 4
1646 || block_ref.round == 5
1647 || block_ref == round_6_block.reference()
1648 {
1649 assert!(dag_state.has_been_included(&block.reference()));
1650 } else {
1651 assert!(!dag_state.has_been_included(&block.reference()));
1652 }
1653 }
1654 }
1655
1656 #[tokio::test]
1657 async fn test_contains_blocks_in_cache_or_store() {
1658 const CACHED_ROUNDS: Round = 2;
1660
1661 let (mut context, _) = Context::new_for_test(4);
1662 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1663
1664 let context = Arc::new(context);
1665 let store = Arc::new(MemStore::new());
1666 let mut dag_state = DagState::new(context.clone(), store.clone());
1667
1668 let num_rounds: u32 = 10;
1670 let num_authorities: u32 = 4;
1671 let mut blocks = Vec::new();
1672
1673 for round in 1..=num_rounds {
1674 for author in 0..num_authorities {
1675 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1676 blocks.push(block);
1677 }
1678 }
1679
1680 blocks.clone().into_iter().for_each(|block| {
1682 if block.round() <= 4 {
1683 store
1684 .write(WriteBatch::default().blocks(vec![block]))
1685 .unwrap();
1686 } else {
1687 dag_state.accept_blocks(vec![block]);
1688 }
1689 });
1690
1691 let mut block_refs = blocks
1694 .iter()
1695 .map(|block| block.reference())
1696 .collect::<Vec<_>>();
1697 let result = dag_state.contains_blocks(block_refs.clone());
1698
1699 let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1701 assert_eq!(result, expected);
1702
1703 block_refs.insert(
1705 3,
1706 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1707 );
1708 let result = dag_state.contains_blocks(block_refs.clone());
1709
1710 expected.insert(3, false);
1712 assert_eq!(result, expected.clone());
1713 }
1714
1715 #[tokio::test]
1716 async fn test_contains_cached_block_at_slot() {
1717 const CACHED_ROUNDS: Round = 2;
1719
1720 let num_authorities: u32 = 4;
1721 let (mut context, _) = Context::new_for_test(num_authorities as usize);
1722 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1723
1724 let context = Arc::new(context);
1725 let store = Arc::new(MemStore::new());
1726 let mut dag_state = DagState::new(context.clone(), store.clone());
1727
1728 let num_rounds: u32 = 10;
1730 let mut blocks = Vec::new();
1731
1732 for round in 1..=num_rounds {
1733 for author in 0..num_authorities {
1734 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1735 blocks.push(block.clone());
1736 dag_state.accept_block(block);
1737 }
1738 }
1739
1740 for (author, _) in context.committee.authorities() {
1742 assert!(
1743 dag_state.contains_cached_block_at_slot(Slot::new(GENESIS_ROUND, author)),
1744 "Genesis should always be found"
1745 );
1746 }
1747
1748 let mut block_refs = blocks
1751 .iter()
1752 .map(|block| block.reference())
1753 .collect::<Vec<_>>();
1754
1755 for block_ref in block_refs.clone() {
1756 let slot = block_ref.into();
1757 let found = dag_state.contains_cached_block_at_slot(slot);
1758 assert!(found, "A block should be found at slot {}", slot);
1759 }
1760
1761 block_refs.insert(
1764 3,
1765 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1766 );
1767 let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1768 expected.insert(3, false);
1769
1770 for block_ref in block_refs {
1772 let slot = block_ref.into();
1773 let found = dag_state.contains_cached_block_at_slot(slot);
1774
1775 assert_eq!(expected.remove(0), found);
1776 }
1777 }
1778
1779 #[tokio::test]
1780 #[ignore]
1781 #[should_panic(
1782 expected = "Attempted to check for slot [1]3 that is <= the last gc evicted round 3"
1783 )]
1784 async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range() {
1785 const GC_DEPTH: u32 = 2;
1788 const CACHED_ROUNDS: Round = 3;
1790
1791 let (mut context, _) = Context::new_for_test(4);
1792 context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
1793 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1794
1795 let context = Arc::new(context);
1796 let store = Arc::new(MemStore::new());
1797 let mut dag_state = DagState::new(context.clone(), store.clone());
1798
1799 let mut dag_builder = DagBuilder::new(context.clone());
1801 dag_builder.layers(1..=3).build();
1802 dag_builder
1803 .layers(4..=6)
1804 .authorities(vec![AuthorityIndex::new_for_test(0)])
1805 .skip_block()
1806 .build();
1807
1808 dag_builder
1810 .all_blocks()
1811 .into_iter()
1812 .for_each(|block| dag_state.accept_block(block));
1813
1814 dag_state.add_commit(TrustedCommit::new_for_test(
1816 1 as CommitIndex,
1817 CommitDigest::MIN,
1818 0,
1819 dag_builder.leader_block(5).unwrap().reference(),
1820 vec![],
1821 ));
1822 dag_state.flush();
1824
1825 assert_eq!(dag_state.gc_round(), 3, "GC round should be 3");
1827
1828 for authority_index in 1..=3 {
1832 for round in 4..=6 {
1833 assert!(dag_state.contains_cached_block_at_slot(Slot::new(
1834 round,
1835 AuthorityIndex::new_for_test(authority_index)
1836 )));
1837 }
1838 }
1839
1840 for round in 1..=3 {
1841 assert!(
1842 dag_state.contains_cached_block_at_slot(Slot::new(
1843 round,
1844 AuthorityIndex::new_for_test(0)
1845 ))
1846 );
1847 }
1848
1849 let _ =
1852 dag_state.contains_cached_block_at_slot(Slot::new(3, AuthorityIndex::new_for_test(1)));
1853 }
1854
1855 #[tokio::test]
1856 async fn test_get_blocks_in_cache_or_store() {
1857 let (context, _) = Context::new_for_test(4);
1858 let context = Arc::new(context);
1859 let store = Arc::new(MemStore::new());
1860 let mut dag_state = DagState::new(context.clone(), store.clone());
1861
1862 let num_rounds: u32 = 10;
1864 let num_authorities: u32 = 4;
1865 let mut blocks = Vec::new();
1866
1867 for round in 1..=num_rounds {
1868 for author in 0..num_authorities {
1869 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1870 blocks.push(block);
1871 }
1872 }
1873
1874 blocks.clone().into_iter().for_each(|block| {
1876 if block.round() <= 4 {
1877 store
1878 .write(WriteBatch::default().blocks(vec![block]))
1879 .unwrap();
1880 } else {
1881 dag_state.accept_blocks(vec![block]);
1882 }
1883 });
1884
1885 let mut block_refs = blocks
1888 .iter()
1889 .map(|block| block.reference())
1890 .collect::<Vec<_>>();
1891 let result = dag_state.get_blocks(&block_refs);
1892
1893 let mut expected = blocks
1894 .into_iter()
1895 .map(Some)
1896 .collect::<Vec<Option<VerifiedBlock>>>();
1897
1898 assert_eq!(result, expected.clone());
1900
1901 block_refs.insert(
1903 3,
1904 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1905 );
1906 let result = dag_state.get_blocks(&block_refs);
1907
1908 expected.insert(3, None);
1910 assert_eq!(result, expected);
1911 }
1912
1913 #[tokio::test]
1914 async fn test_flush_and_recovery() {
1915 telemetry_subscribers::init_for_testing();
1916
1917 const GC_DEPTH: u32 = 3;
1918 const CACHED_ROUNDS: u32 = 4;
1919
1920 let num_authorities: u32 = 4;
1921 let (mut context, _) = Context::new_for_test(num_authorities as usize);
1922 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1923 context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
1924
1925 let context = Arc::new(context);
1926
1927 let store = Arc::new(MemStore::new());
1928 let mut dag_state = DagState::new(context.clone(), store.clone());
1929
1930 const NUM_ROUNDS: Round = 20;
1931 let mut dag_builder = DagBuilder::new(context.clone());
1932 dag_builder.layers(1..=5).build();
1933 dag_builder
1934 .layers(6..=8)
1935 .authorities(vec![AuthorityIndex::new_for_test(0)])
1936 .skip_block()
1937 .build();
1938 dag_builder.layers(9..=NUM_ROUNDS).build();
1939
1940 const LAST_COMMIT_ROUND: Round = 16;
1942 const LAST_COMMIT_INDEX: CommitIndex = 15;
1943 let commits = dag_builder
1944 .get_sub_dag_and_commits(1..=NUM_ROUNDS)
1945 .into_iter()
1946 .map(|(_subdag, commit)| commit)
1947 .take(LAST_COMMIT_INDEX as usize)
1948 .collect::<Vec<_>>();
1949 assert_eq!(commits.len(), LAST_COMMIT_INDEX as usize);
1950 assert_eq!(commits.last().unwrap().round(), LAST_COMMIT_ROUND);
1951
1952 const PERSISTED_BLOCK_ROUNDS: u32 = 12;
1956 const NUM_PERSISTED_COMMITS: usize = 8;
1957 const LAST_PERSISTED_COMMIT_ROUND: Round = 9;
1958 const LAST_PERSISTED_COMMIT_INDEX: CommitIndex = 8;
1959 dag_state.accept_blocks(dag_builder.blocks(1..=PERSISTED_BLOCK_ROUNDS));
1960 let mut finalized_commits = vec![];
1961 for commit in commits.iter().take(NUM_PERSISTED_COMMITS).cloned() {
1962 finalized_commits.push(commit.clone());
1963 dag_state.add_commit(commit);
1964 }
1965 let last_finalized_commit = finalized_commits.last().unwrap();
1966 assert_eq!(last_finalized_commit.round(), LAST_PERSISTED_COMMIT_ROUND);
1967 assert_eq!(last_finalized_commit.index(), LAST_PERSISTED_COMMIT_INDEX);
1968
1969 let finalized_blocks = finalized_commits
1971 .iter()
1972 .flat_map(|commit| commit.blocks())
1973 .collect::<BTreeSet<_>>();
1974
1975 dag_state.flush();
1977
1978 let store_blocks = store
1980 .scan_blocks_by_author(AuthorityIndex::new_for_test(1), 1)
1981 .unwrap();
1982 assert_eq!(store_blocks.last().unwrap().round(), PERSISTED_BLOCK_ROUNDS);
1983 let store_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1984 assert_eq!(store_commits.len(), NUM_PERSISTED_COMMITS);
1985 assert_eq!(
1986 store_commits.last().unwrap().index(),
1987 LAST_PERSISTED_COMMIT_INDEX
1988 );
1989 assert_eq!(
1990 store_commits.last().unwrap().round(),
1991 LAST_PERSISTED_COMMIT_ROUND
1992 );
1993
1994 dag_state.accept_blocks(dag_builder.blocks(PERSISTED_BLOCK_ROUNDS + 1..=NUM_ROUNDS));
1996 for commit in commits.iter().skip(NUM_PERSISTED_COMMITS).cloned() {
1997 dag_state.add_commit(commit);
1998 }
1999
2000 let all_blocks = dag_builder.blocks(1..=NUM_ROUNDS);
2002 let block_refs = all_blocks
2003 .iter()
2004 .map(|block| block.reference())
2005 .collect::<Vec<_>>();
2006 let result = dag_state
2007 .get_blocks(&block_refs)
2008 .into_iter()
2009 .map(|b| b.unwrap())
2010 .collect::<Vec<_>>();
2011 assert_eq!(result, all_blocks);
2012
2013 assert_eq!(dag_state.last_commit_index(), LAST_COMMIT_INDEX);
2015
2016 drop(dag_state);
2018
2019 let dag_state = DagState::new(context.clone(), store.clone());
2021
2022 let all_blocks = dag_builder.blocks(1..=PERSISTED_BLOCK_ROUNDS);
2024 let block_refs = all_blocks
2025 .iter()
2026 .map(|block| block.reference())
2027 .collect::<Vec<_>>();
2028 let result = dag_state
2029 .get_blocks(&block_refs)
2030 .into_iter()
2031 .map(|b| b.unwrap())
2032 .collect::<Vec<_>>();
2033 assert_eq!(result, all_blocks);
2034
2035 let missing_blocks = dag_builder.blocks(PERSISTED_BLOCK_ROUNDS + 1..=NUM_ROUNDS);
2037 let block_refs = missing_blocks
2038 .iter()
2039 .map(|block| block.reference())
2040 .collect::<Vec<_>>();
2041 let retrieved_blocks = dag_state
2042 .get_blocks(&block_refs)
2043 .into_iter()
2044 .flatten()
2045 .collect::<Vec<_>>();
2046 assert!(retrieved_blocks.is_empty());
2047
2048 assert_eq!(dag_state.last_commit_index(), LAST_PERSISTED_COMMIT_INDEX);
2050 assert_eq!(dag_state.last_commit_round(), LAST_PERSISTED_COMMIT_ROUND);
2051
2052 let expected_last_committed_rounds = vec![5, 9, 8, 8];
2054 assert_eq!(
2055 dag_state.last_committed_rounds(),
2056 expected_last_committed_rounds
2057 );
2058 assert_eq!(dag_state.scoring_subdags_count(), NUM_PERSISTED_COMMITS);
2060
2061 for (authority_index, _) in context.committee.authorities() {
2063 let blocks = dag_state.get_cached_blocks(authority_index, 1);
2064
2065 if authority_index == AuthorityIndex::new_for_test(0) {
2069 assert_eq!(blocks.len(), 4);
2070 assert_eq!(dag_state.evicted_rounds[authority_index.value()], 6);
2071 assert!(
2072 blocks
2073 .into_iter()
2074 .all(|block| block.round() >= 7 && block.round() <= 12)
2075 );
2076 } else {
2077 assert_eq!(blocks.len(), 6);
2078 assert_eq!(dag_state.evicted_rounds[authority_index.value()], 6);
2079 assert!(
2080 blocks
2081 .into_iter()
2082 .all(|block| block.round() >= 7 && block.round() <= 12)
2083 );
2084 }
2085 }
2086
2087 let gc_round = dag_state.gc_round();
2089 assert_eq!(gc_round, 6);
2090 dag_state
2091 .recent_blocks
2092 .iter()
2093 .for_each(|(block_ref, block_info)| {
2094 if block_ref.round > gc_round && finalized_blocks.contains(block_ref) {
2095 assert!(
2096 block_info.committed,
2097 "Block {:?} should be set as committed",
2098 block_ref
2099 );
2100 }
2101 });
2102
2103 dag_state
2108 .recent_blocks
2109 .iter()
2110 .for_each(|(block_ref, block_info)| {
2111 if block_ref.round < PERSISTED_BLOCK_ROUNDS || block_ref.author.value() == 0 {
2112 assert!(block_info.included);
2113 } else {
2114 assert!(!block_info.included);
2115 }
2116 });
2117 }
2118
2119 #[tokio::test]
2120 async fn test_block_info_as_committed() {
2121 let num_authorities: u32 = 4;
2122 let (context, _) = Context::new_for_test(num_authorities as usize);
2123 let context = Arc::new(context);
2124
2125 let store = Arc::new(MemStore::new());
2126 let mut dag_state = DagState::new(context.clone(), store.clone());
2127
2128 let block = VerifiedBlock::new_for_test(
2130 TestBlock::new(1, 0)
2131 .set_timestamp_ms(1000)
2132 .set_ancestors(vec![])
2133 .build(),
2134 );
2135
2136 dag_state.accept_block(block.clone());
2137
2138 assert!(!dag_state.is_committed(&block.reference()));
2140
2141 assert!(
2143 dag_state.set_committed(&block.reference()),
2144 "Block should be successfully set as committed for first time"
2145 );
2146
2147 assert!(dag_state.is_committed(&block.reference()));
2149
2150 assert!(
2152 !dag_state.set_committed(&block.reference()),
2153 "Block should not be successfully set as committed"
2154 );
2155 }
2156
2157 #[tokio::test]
2158 async fn test_get_cached_blocks() {
2159 let (mut context, _) = Context::new_for_test(4);
2160 context.parameters.dag_state_cached_rounds = 5;
2161
2162 let context = Arc::new(context);
2163 let store = Arc::new(MemStore::new());
2164 let mut dag_state = DagState::new(context.clone(), store.clone());
2165
2166 let mut all_blocks = Vec::new();
2171 for author in 1..=3 {
2172 for round in 10..(10 + author) {
2173 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2174 all_blocks.push(block.clone());
2175 dag_state.accept_block(block);
2176 }
2177 }
2178
2179 let cached_blocks =
2182 dag_state.get_cached_blocks(context.committee.to_authority_index(0).unwrap(), 0);
2183 assert!(cached_blocks.is_empty());
2184
2185 let cached_blocks =
2186 dag_state.get_cached_blocks(context.committee.to_authority_index(1).unwrap(), 10);
2187 assert_eq!(cached_blocks.len(), 1);
2188 assert_eq!(cached_blocks[0].round(), 10);
2189
2190 let cached_blocks =
2191 dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 10);
2192 assert_eq!(cached_blocks.len(), 2);
2193 assert_eq!(cached_blocks[0].round(), 10);
2194 assert_eq!(cached_blocks[1].round(), 11);
2195
2196 let cached_blocks =
2197 dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 11);
2198 assert_eq!(cached_blocks.len(), 1);
2199 assert_eq!(cached_blocks[0].round(), 11);
2200
2201 let cached_blocks =
2202 dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 10);
2203 assert_eq!(cached_blocks.len(), 3);
2204 assert_eq!(cached_blocks[0].round(), 10);
2205 assert_eq!(cached_blocks[1].round(), 11);
2206 assert_eq!(cached_blocks[2].round(), 12);
2207
2208 let cached_blocks =
2209 dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 12);
2210 assert_eq!(cached_blocks.len(), 1);
2211 assert_eq!(cached_blocks[0].round(), 12);
2212
2213 let cached_blocks = dag_state.get_cached_blocks_in_range(
2217 context.committee.to_authority_index(3).unwrap(),
2218 10,
2219 10,
2220 1,
2221 );
2222 assert!(cached_blocks.is_empty());
2223
2224 let cached_blocks = dag_state.get_cached_blocks_in_range(
2226 context.committee.to_authority_index(3).unwrap(),
2227 11,
2228 10,
2229 1,
2230 );
2231 assert!(cached_blocks.is_empty());
2232
2233 let cached_blocks = dag_state.get_cached_blocks_in_range(
2235 context.committee.to_authority_index(0).unwrap(),
2236 9,
2237 10,
2238 1,
2239 );
2240 assert!(cached_blocks.is_empty());
2241
2242 let cached_blocks = dag_state.get_cached_blocks_in_range(
2244 context.committee.to_authority_index(1).unwrap(),
2245 9,
2246 11,
2247 1,
2248 );
2249 assert_eq!(cached_blocks.len(), 1);
2250 assert_eq!(cached_blocks[0].round(), 10);
2251
2252 let cached_blocks = dag_state.get_cached_blocks_in_range(
2254 context.committee.to_authority_index(2).unwrap(),
2255 9,
2256 12,
2257 5,
2258 );
2259 assert_eq!(cached_blocks.len(), 2);
2260 assert_eq!(cached_blocks[0].round(), 10);
2261 assert_eq!(cached_blocks[1].round(), 11);
2262
2263 let cached_blocks = dag_state.get_cached_blocks_in_range(
2265 context.committee.to_authority_index(3).unwrap(),
2266 11,
2267 20,
2268 5,
2269 );
2270 assert_eq!(cached_blocks.len(), 2);
2271 assert_eq!(cached_blocks[0].round(), 11);
2272 assert_eq!(cached_blocks[1].round(), 12);
2273
2274 let cached_blocks = dag_state.get_cached_blocks_in_range(
2276 context.committee.to_authority_index(3).unwrap(),
2277 10,
2278 20,
2279 1,
2280 );
2281 assert_eq!(cached_blocks.len(), 1);
2282 assert_eq!(cached_blocks[0].round(), 10);
2283 }
2284
2285 #[tokio::test]
2286 async fn test_get_last_cached_block() {
2287 const CACHED_ROUNDS: Round = 2;
2289 const GC_DEPTH: u32 = 1;
2290 let (mut context, _) = Context::new_for_test(4);
2291 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2292 context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
2293
2294 let context = Arc::new(context);
2295 let store = Arc::new(MemStore::new());
2296 let mut dag_state = DagState::new(context.clone(), store.clone());
2297
2298 let dag_str = "DAG {
2303 Round 0 : { 4 },
2304 Round 1 : {
2305 B -> [*],
2306 C -> [*],
2307 D -> [*],
2308 },
2309 Round 2 : {
2310 C -> [*],
2311 D -> [*],
2312 },
2313 Round 3 : {
2314 D -> [*],
2315 },
2316 }";
2317
2318 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2319
2320 let block = VerifiedBlock::new_for_test(TestBlock::new(2, 2).build());
2322
2323 for block in dag_builder
2325 .all_blocks()
2326 .into_iter()
2327 .chain(std::iter::once(block))
2328 {
2329 dag_state.accept_block(block);
2330 }
2331
2332 dag_state.add_commit(TrustedCommit::new_for_test(
2333 1 as CommitIndex,
2334 CommitDigest::MIN,
2335 context.clock.timestamp_utc_ms(),
2336 dag_builder.leader_block(3).unwrap().reference(),
2337 vec![],
2338 ));
2339
2340 let end_round = 4;
2342 let expected_rounds = vec![0, 1, 2, 3];
2343 let expected_excluded_and_equivocating_blocks = vec![0, 0, 1, 0];
2344 let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2346 assert_eq!(
2347 last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2348 expected_rounds
2349 );
2350 assert_eq!(
2351 last_blocks.iter().map(|b| b.1.len()).collect::<Vec<_>>(),
2352 expected_excluded_and_equivocating_blocks
2353 );
2354
2355 for (i, expected_round) in expected_rounds.iter().enumerate() {
2357 let round = dag_state
2358 .get_last_cached_block_in_range(
2359 context.committee.to_authority_index(i).unwrap(),
2360 0,
2361 end_round,
2362 )
2363 .map(|b| b.round())
2364 .unwrap_or_default();
2365 assert_eq!(round, *expected_round, "Authority {i}");
2366 }
2367
2368 let start_round = 2;
2370 let expected_rounds = [0, 0, 2, 3];
2371
2372 for (i, expected_round) in expected_rounds.iter().enumerate() {
2374 let round = dag_state
2375 .get_last_cached_block_in_range(
2376 context.committee.to_authority_index(i).unwrap(),
2377 start_round,
2378 end_round,
2379 )
2380 .map(|b| b.round())
2381 .unwrap_or_default();
2382 assert_eq!(round, *expected_round, "Authority {i}");
2383 }
2384
2385 dag_state.flush();
2391
2392 let end_round = 3;
2394 let expected_rounds = vec![0, 1, 2, 2];
2395
2396 let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2398 assert_eq!(
2399 last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2400 expected_rounds
2401 );
2402
2403 for (i, expected_round) in expected_rounds.iter().enumerate() {
2405 let round = dag_state
2406 .get_last_cached_block_in_range(
2407 context.committee.to_authority_index(i).unwrap(),
2408 0,
2409 end_round,
2410 )
2411 .map(|b| b.round())
2412 .unwrap_or_default();
2413 assert_eq!(round, *expected_round, "Authority {i}");
2414 }
2415 }
2416
2417 #[tokio::test]
2418 #[should_panic(
2419 expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
2420 )]
2421 async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() {
2422 const CACHED_ROUNDS: Round = 1;
2424 const GC_DEPTH: u32 = 1;
2425 let (mut context, _) = Context::new_for_test(4);
2426 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2427 context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
2428
2429 let context = Arc::new(context);
2430 let store = Arc::new(MemStore::new());
2431 let mut dag_state = DagState::new(context.clone(), store.clone());
2432
2433 let mut dag_builder = DagBuilder::new(context.clone());
2438 dag_builder
2439 .layers(1..=1)
2440 .authorities(vec![AuthorityIndex::new_for_test(0)])
2441 .skip_block()
2442 .build();
2443 dag_builder
2444 .layers(2..=2)
2445 .authorities(vec![
2446 AuthorityIndex::new_for_test(0),
2447 AuthorityIndex::new_for_test(1),
2448 ])
2449 .skip_block()
2450 .build();
2451 dag_builder
2452 .layers(3..=3)
2453 .authorities(vec![
2454 AuthorityIndex::new_for_test(0),
2455 AuthorityIndex::new_for_test(1),
2456 AuthorityIndex::new_for_test(2),
2457 ])
2458 .skip_block()
2459 .build();
2460
2461 for block in dag_builder.all_blocks() {
2463 dag_state.accept_block(block);
2464 }
2465
2466 dag_state.add_commit(TrustedCommit::new_for_test(
2467 1 as CommitIndex,
2468 CommitDigest::MIN,
2469 0,
2470 dag_builder.leader_block(3).unwrap().reference(),
2471 vec![],
2472 ));
2473
2474 dag_state.flush();
2476
2477 dag_state.get_last_cached_block_per_authority(2);
2479 }
2480
2481 #[tokio::test]
2482 async fn test_last_quorum() {
2483 let (context, _) = Context::new_for_test(4);
2485 let context = Arc::new(context);
2486 let store = Arc::new(MemStore::new());
2487 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2488
2489 {
2491 let genesis = genesis_blocks(context.as_ref());
2492
2493 assert_eq!(dag_state.read().last_quorum(), genesis);
2494 }
2495
2496 {
2498 let mut dag_builder = DagBuilder::new(context.clone());
2499 dag_builder
2500 .layers(1..=4)
2501 .build()
2502 .persist_layers(dag_state.clone());
2503 let round_4_blocks: Vec<_> = dag_builder
2504 .blocks(4..=4)
2505 .into_iter()
2506 .map(|block| block.reference())
2507 .collect();
2508
2509 let last_quorum = dag_state.read().last_quorum();
2510
2511 assert_eq!(
2512 last_quorum
2513 .into_iter()
2514 .map(|block| block.reference())
2515 .collect::<Vec<_>>(),
2516 round_4_blocks
2517 );
2518 }
2519
2520 {
2522 let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2523 dag_state.write().accept_block(block);
2524
2525 let round_4_blocks = dag_state.read().get_uncommitted_blocks_at_round(4);
2526
2527 let last_quorum = dag_state.read().last_quorum();
2528
2529 assert_eq!(last_quorum, round_4_blocks);
2530 }
2531 }
2532
2533 #[tokio::test]
2534 async fn test_last_block_for_authority() {
2535 let (context, _) = Context::new_for_test(4);
2537 let context = Arc::new(context);
2538 let store = Arc::new(MemStore::new());
2539 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2540
2541 {
2543 let genesis = genesis_blocks(context.as_ref());
2544 let my_genesis = genesis
2545 .into_iter()
2546 .find(|block| block.author() == context.own_index)
2547 .unwrap();
2548
2549 assert_eq!(dag_state.read().get_last_proposed_block(), my_genesis);
2550 }
2551
2552 {
2554 let mut dag_builder = DagBuilder::new(context.clone());
2556 dag_builder
2557 .layers(1..=4)
2558 .build()
2559 .persist_layers(dag_state.clone());
2560
2561 let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2563 dag_state.write().accept_block(block);
2564
2565 let block = dag_state
2566 .read()
2567 .get_last_block_for_authority(AuthorityIndex::new_for_test(0));
2568 assert_eq!(block.round(), 5);
2569
2570 for (authority_index, _) in context.committee.authorities() {
2571 let block = dag_state
2572 .read()
2573 .get_last_block_for_authority(authority_index);
2574
2575 if authority_index.value() == 0 {
2576 assert_eq!(block.round(), 5);
2577 } else {
2578 assert_eq!(block.round(), 4);
2579 }
2580 }
2581 }
2582 }
2583
2584 #[tokio::test]
2585 async fn test_accept_block_not_panics_when_timestamp_is_ahead_and_median_timestamp() {
2586 let (context, _) = Context::new_for_test(4);
2588 let context = Arc::new(context);
2589 let store = Arc::new(MemStore::new());
2590 let mut dag_state = DagState::new(context.clone(), store.clone());
2591
2592 let block_timestamp = context.clock.timestamp_utc_ms() + 5_000;
2594
2595 let block = VerifiedBlock::new_for_test(
2596 TestBlock::new(10, 0)
2597 .set_timestamp_ms(block_timestamp)
2598 .build(),
2599 );
2600
2601 dag_state.accept_block(block);
2603 }
2604
2605 #[tokio::test]
2606 async fn test_last_finalized_commit() {
2607 let (context, _) = Context::new_for_test(4);
2609 let context = Arc::new(context);
2610 let store = Arc::new(MemStore::new());
2611 let mut dag_state = DagState::new(context.clone(), store.clone());
2612
2613 let commit_ref = CommitRef::new(1, CommitDigest::MIN);
2615 let rejected_transactions = BTreeMap::new();
2616 dag_state.add_finalized_commit(commit_ref, rejected_transactions.clone());
2617
2618 assert_eq!(dag_state.finalized_commits_to_write.len(), 1);
2620 assert_eq!(
2621 dag_state.finalized_commits_to_write[0],
2622 (commit_ref, rejected_transactions.clone())
2623 );
2624
2625 dag_state.flush();
2627
2628 let last_finalized_commit = store.read_last_finalized_commit().unwrap();
2630 assert_eq!(last_finalized_commit, Some(commit_ref));
2631 let stored_rejected_transactions = store
2632 .read_rejected_transactions(commit_ref)
2633 .unwrap()
2634 .unwrap();
2635 assert_eq!(stored_rejected_transactions, rejected_transactions);
2636 }
2637}