1use std::{
5 cmp::max,
6 collections::{BTreeMap, BTreeSet, VecDeque},
7 ops::Bound::{Excluded, Included, Unbounded},
8 panic,
9 sync::Arc,
10 time::Duration,
11 vec,
12};
13
14use consensus_config::AuthorityIndex;
15use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs, Round, TransactionIndex};
16use itertools::Itertools as _;
17use tokio::time::Instant;
18use tracing::{debug, error, info, trace};
19
20use crate::{
21 CommittedSubDag,
22 block::{BlockAPI, GENESIS_ROUND, Slot, VerifiedBlock, genesis_blocks},
23 commit::{
24 CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRef, CommitVote,
25 GENESIS_COMMIT_INDEX, TrustedCommit, load_committed_subdag_from_store,
26 },
27 context::Context,
28 leader_scoring::{ReputationScores, ScoringSubdag},
29 storage::{Store, WriteBatch},
30 threshold_clock::ThresholdClock,
31};
32
33pub struct DagState {
41 context: Arc<Context>,
42
43 genesis: BTreeMap<BlockRef, VerifiedBlock>,
45
46 recent_blocks: BTreeMap<BlockRef, BlockInfo>,
54
55 recent_refs_by_authority: Vec<BTreeSet<BlockRef>>,
58
59 threshold_clock: ThresholdClock,
61
62 evicted_rounds: Vec<Round>,
66
67 highest_accepted_round: Round,
69
70 last_commit: Option<TrustedCommit>,
72
73 last_commit_round_advancement_time: Option<std::time::Instant>,
75
76 last_committed_rounds: Vec<Round>,
78
79 scoring_subdag: ScoringSubdag,
82
83 pending_commit_votes: VecDeque<CommitVote>,
87
88 blocks_to_write: Vec<VerifiedBlock>,
91 commits_to_write: Vec<TrustedCommit>,
92
93 commit_info_to_write: Vec<(CommitRef, CommitInfo)>,
97
98 finalized_commits_to_write: Vec<(CommitRef, BTreeMap<BlockRef, Vec<TransactionIndex>>)>,
100
101 store: Arc<dyn Store>,
103
104 cached_rounds: Round,
106}
107
108impl DagState {
109 pub fn new(context: Arc<Context>, store: Arc<dyn Store>) -> Self {
111 let cached_rounds = context.parameters.dag_state_cached_rounds as Round;
112 let num_authorities = context.committee.size();
113
114 let genesis = genesis_blocks(context.as_ref())
115 .into_iter()
116 .map(|block| (block.reference(), block))
117 .collect();
118
119 let threshold_clock = ThresholdClock::new(1, context.clone());
120
121 let last_commit = store
122 .read_last_commit()
123 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
124
125 let commit_info = store
126 .read_last_commit_info()
127 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
128 let (mut last_committed_rounds, commit_recovery_start_index) =
129 if let Some((commit_ref, commit_info)) = commit_info {
130 tracing::info!("Recovering committed state from {commit_ref} {commit_info:?}");
131 (commit_info.committed_rounds, commit_ref.index + 1)
132 } else {
133 tracing::info!("Found no stored CommitInfo to recover from");
134 (vec![0; num_authorities], GENESIS_COMMIT_INDEX + 1)
135 };
136
137 let mut unscored_committed_subdags = Vec::new();
138 let mut scoring_subdag = ScoringSubdag::new(context.clone());
139
140 if let Some(last_commit) = last_commit.as_ref() {
141 store
142 .scan_commits((commit_recovery_start_index..=last_commit.index()).into())
143 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
144 .iter()
145 .for_each(|commit| {
146 for block_ref in commit.blocks() {
147 last_committed_rounds[block_ref.author] =
148 max(last_committed_rounds[block_ref.author], block_ref.round);
149 }
150 let committed_subdag = load_committed_subdag_from_store(
151 &context,
152 store.as_ref(),
153 commit.clone(),
154 vec![],
155 );
156 unscored_committed_subdags.push(committed_subdag);
158 });
159 }
160
161 tracing::info!(
162 "DagState was initialized with the following state: \
163 {last_commit:?}; {last_committed_rounds:?}; {} unscored committed subdags;",
164 unscored_committed_subdags.len()
165 );
166
167 scoring_subdag.add_subdags(std::mem::take(&mut unscored_committed_subdags));
168
169 let mut state = Self {
170 context: context.clone(),
171 genesis,
172 recent_blocks: BTreeMap::new(),
173 recent_refs_by_authority: vec![BTreeSet::new(); num_authorities],
174 threshold_clock,
175 highest_accepted_round: 0,
176 last_commit: last_commit.clone(),
177 last_commit_round_advancement_time: None,
178 last_committed_rounds: last_committed_rounds.clone(),
179 pending_commit_votes: VecDeque::new(),
180 blocks_to_write: vec![],
181 commits_to_write: vec![],
182 commit_info_to_write: vec![],
183 finalized_commits_to_write: vec![],
184 scoring_subdag,
185 store: store.clone(),
186 cached_rounds,
187 evicted_rounds: vec![0; num_authorities],
188 };
189
190 for (authority_index, _) in context.committee.authorities() {
191 let (blocks, eviction_round) = {
192 let last_block = state
195 .store
196 .scan_last_blocks_by_author(authority_index, 1, None)
197 .expect("Database error");
198 let last_block_round = last_block
199 .last()
200 .map(|b| b.round())
201 .unwrap_or(GENESIS_ROUND);
202
203 let eviction_round =
204 Self::eviction_round(last_block_round, state.gc_round(), state.cached_rounds);
205 let blocks = state
206 .store
207 .scan_blocks_by_author(authority_index, eviction_round + 1)
208 .expect("Database error");
209
210 (blocks, eviction_round)
211 };
212
213 state.evicted_rounds[authority_index] = eviction_round;
214
215 for block in &blocks {
217 state.update_block_metadata(block);
218 }
219
220 debug!(
221 "Recovered blocks {}: {:?}",
222 authority_index,
223 blocks
224 .iter()
225 .map(|b| b.reference())
226 .collect::<Vec<BlockRef>>()
227 );
228 }
229
230 if let Some(last_commit) = last_commit {
231 let mut index = last_commit.index();
232 let gc_round = state.gc_round();
233 info!(
234 "Recovering block commit statuses from commit index {} and backwards until leader of round <= gc_round {:?}",
235 index, gc_round
236 );
237
238 loop {
239 let commits = store
240 .scan_commits((index..=index).into())
241 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
242 let Some(commit) = commits.first() else {
243 info!("Recovering finished up to index {index}, no more commits to recover");
244 break;
245 };
246
247 if gc_round > 0 && commit.leader().round <= gc_round {
249 info!(
250 "Recovering finished, reached commit leader round {} <= gc_round {}",
251 commit.leader().round,
252 gc_round
253 );
254 break;
255 }
256
257 commit.blocks().iter().filter(|b| b.round > gc_round).for_each(|block_ref|{
258 debug!(
259 "Setting block {:?} as committed based on commit {:?}",
260 block_ref,
261 commit.index()
262 );
263 assert!(state.set_committed(block_ref), "Attempted to set again a block {:?} as committed when recovering commit {:?}", block_ref, commit);
264 });
265
266 index = index.saturating_sub(1);
268 if index == 0 {
269 break;
270 }
271 }
272 }
273
274 let proposed_blocks = store
276 .scan_blocks_by_author(context.own_index, state.gc_round() + 1)
277 .expect("Database error");
278 for block in proposed_blocks {
279 state.link_causal_history(block.reference());
280 }
281
282 state
283 }
284
285 pub(crate) fn accept_block(&mut self, block: VerifiedBlock) {
287 assert_ne!(
288 block.round(),
289 0,
290 "Genesis block should not be accepted into DAG."
291 );
292
293 let block_ref = block.reference();
294 if self.contains_block(&block_ref) {
295 return;
296 }
297
298 let now = self.context.clock.timestamp_utc_ms();
299 if block.timestamp_ms() > now {
300 trace!(
301 "Block {:?} with timestamp {} is greater than local timestamp {}.",
302 block,
303 block.timestamp_ms(),
304 now,
305 );
306 }
307 let hostname = &self.context.committee.authority(block_ref.author).hostname;
308 self.context
309 .metrics
310 .node_metrics
311 .accepted_block_time_drift_ms
312 .with_label_values(&[hostname])
313 .inc_by(block.timestamp_ms().saturating_sub(now));
314
315 if block_ref.author == self.context.own_index {
318 let existing_blocks = self.get_uncommitted_blocks_at_slot(block_ref.into());
319 if !self
320 .context
321 .parameters
322 .internal
323 .skip_equivocation_validation
324 {
325 assert!(
326 existing_blocks.is_empty(),
327 "Block Rejected! Attempted to add block {block:#?} to own slot where \
328 block(s) {existing_blocks:#?} already exists."
329 );
330 }
331 }
332 self.update_block_metadata(&block);
333 self.blocks_to_write.push(block);
334 let source = if self.context.own_index == block_ref.author {
335 "own"
336 } else {
337 "others"
338 };
339 self.context
340 .metrics
341 .node_metrics
342 .accepted_blocks
343 .with_label_values(&[source])
344 .inc();
345 }
346
347 fn update_block_metadata(&mut self, block: &VerifiedBlock) {
349 let block_ref = block.reference();
350 self.recent_blocks
351 .insert(block_ref, BlockInfo::new(block.clone()));
352 self.recent_refs_by_authority[block_ref.author].insert(block_ref);
353
354 if self.threshold_clock.add_block(block_ref) {
355 let last_proposed_block = self.get_last_proposed_block();
357 if last_proposed_block.round() == block_ref.round {
358 let quorum_delay_ms = self
359 .context
360 .clock
361 .timestamp_utc_ms()
362 .saturating_sub(self.get_last_proposed_block().timestamp_ms());
363 self.context
364 .metrics
365 .node_metrics
366 .quorum_receive_latency
367 .observe(Duration::from_millis(quorum_delay_ms).as_secs_f64());
368 }
369 }
370
371 self.highest_accepted_round = max(self.highest_accepted_round, block.round());
372 self.context
373 .metrics
374 .node_metrics
375 .highest_accepted_round
376 .set(self.highest_accepted_round as i64);
377
378 let highest_accepted_round_for_author = self.recent_refs_by_authority[block_ref.author]
379 .last()
380 .map(|block_ref| block_ref.round)
381 .expect("There should be by now at least one block ref");
382 let hostname = &self.context.committee.authority(block_ref.author).hostname;
383 self.context
384 .metrics
385 .node_metrics
386 .highest_accepted_authority_round
387 .with_label_values(&[hostname])
388 .set(highest_accepted_round_for_author as i64);
389 }
390
391 pub(crate) fn accept_blocks(&mut self, blocks: Vec<VerifiedBlock>) {
393 debug!(
394 "Accepting blocks: {}",
395 blocks.iter().map(|b| b.reference().to_string()).join(",")
396 );
397 for block in blocks {
398 self.accept_block(block);
399 }
400 }
401
402 pub(crate) fn get_block(&self, reference: &BlockRef) -> Option<VerifiedBlock> {
405 self.get_blocks(&[*reference])
406 .pop()
407 .expect("Exactly one element should be returned")
408 }
409
410 pub(crate) fn get_blocks(&self, block_refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
413 let mut blocks = vec![None; block_refs.len()];
414 let mut missing = Vec::new();
415
416 for (index, block_ref) in block_refs.iter().enumerate() {
417 if block_ref.round == GENESIS_ROUND {
418 if let Some(block) = self.genesis.get(block_ref) {
420 blocks[index] = Some(block.clone());
421 }
422 continue;
423 }
424 if let Some(block_info) = self.recent_blocks.get(block_ref) {
425 blocks[index] = Some(block_info.block.clone());
426 continue;
427 }
428 missing.push((index, block_ref));
429 }
430
431 if missing.is_empty() {
432 return blocks;
433 }
434
435 let missing_refs = missing
436 .iter()
437 .map(|(_, block_ref)| **block_ref)
438 .collect::<Vec<_>>();
439 let store_results = self
440 .store
441 .read_blocks(&missing_refs)
442 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
443 self.context
444 .metrics
445 .node_metrics
446 .dag_state_store_read_count
447 .with_label_values(&["get_blocks"])
448 .inc();
449
450 for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
451 blocks[index] = result;
452 }
453
454 blocks
455 }
456
457 pub(crate) fn get_uncommitted_blocks_at_slot(&self, slot: Slot) -> Vec<VerifiedBlock> {
460 let mut blocks = vec![];
464 for (_block_ref, block_info) in self.recent_blocks.range((
465 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
466 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
467 )) {
468 blocks.push(block_info.block.clone())
469 }
470 blocks
471 }
472
473 pub(crate) fn get_uncommitted_blocks_at_round(&self, round: Round) -> Vec<VerifiedBlock> {
476 if round <= self.last_commit_round() {
477 panic!("Round {} have committed blocks!", round);
478 }
479
480 let mut blocks = vec![];
481 for (_block_ref, block_info) in self.recent_blocks.range((
482 Included(BlockRef::new(round, AuthorityIndex::ZERO, BlockDigest::MIN)),
483 Excluded(BlockRef::new(
484 round + 1,
485 AuthorityIndex::ZERO,
486 BlockDigest::MIN,
487 )),
488 )) {
489 blocks.push(block_info.block.clone())
490 }
491 blocks
492 }
493
494 pub(crate) fn ancestors_at_round(
496 &self,
497 later_block: &VerifiedBlock,
498 earlier_round: Round,
499 ) -> Vec<VerifiedBlock> {
500 let mut linked: BTreeSet<BlockRef> = later_block.ancestors().iter().cloned().collect();
502 while !linked.is_empty() {
503 let round = linked.last().unwrap().round;
504 if round <= earlier_round {
506 break;
507 }
508 let block_ref = linked.pop_last().unwrap();
509 let Some(block) = self.get_block(&block_ref) else {
510 panic!("Block {:?} should exist in DAG!", block_ref);
511 };
512 linked.extend(block.ancestors().iter().cloned());
513 }
514 linked
515 .range((
516 Included(BlockRef::new(
517 earlier_round,
518 AuthorityIndex::ZERO,
519 BlockDigest::MIN,
520 )),
521 Unbounded,
522 ))
523 .map(|r| {
524 self.get_block(r)
525 .unwrap_or_else(|| panic!("Block {:?} should exist in DAG!", r))
526 .clone()
527 })
528 .collect()
529 }
530
531 pub(crate) fn get_last_proposed_block(&self) -> VerifiedBlock {
534 self.get_last_block_for_authority(self.context.own_index)
535 }
536
537 pub(crate) fn get_last_block_for_authority(&self, authority: AuthorityIndex) -> VerifiedBlock {
540 if let Some(last) = self.recent_refs_by_authority[authority].last() {
541 return self
542 .recent_blocks
543 .get(last)
544 .expect("Block should be found in recent blocks")
545 .block
546 .clone();
547 }
548
549 let (_, genesis_block) = self
551 .genesis
552 .iter()
553 .find(|(block_ref, _)| block_ref.author == authority)
554 .expect("Genesis should be found for authority {authority_index}");
555 genesis_block.clone()
556 }
557
558 pub(crate) fn get_cached_blocks(
564 &self,
565 authority: AuthorityIndex,
566 start: Round,
567 ) -> Vec<VerifiedBlock> {
568 self.get_cached_blocks_in_range(authority, start, Round::MAX, usize::MAX)
569 }
570
571 pub(crate) fn get_cached_blocks_in_range(
574 &self,
575 authority: AuthorityIndex,
576 start_round: Round,
577 end_round: Round,
578 limit: usize,
579 ) -> Vec<VerifiedBlock> {
580 if start_round >= end_round || limit == 0 {
581 return vec![];
582 }
583
584 let mut blocks = vec![];
585 for block_ref in self.recent_refs_by_authority[authority].range((
586 Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
587 Excluded(BlockRef::new(
588 end_round,
589 AuthorityIndex::MIN,
590 BlockDigest::MIN,
591 )),
592 )) {
593 let block_info = self
594 .recent_blocks
595 .get(block_ref)
596 .expect("Block should exist in recent blocks");
597 blocks.push(block_info.block.clone());
598 if blocks.len() >= limit {
599 break;
600 }
601 }
602 blocks
603 }
604
605 pub(crate) fn get_last_cached_block_in_range(
607 &self,
608 authority: AuthorityIndex,
609 start_round: Round,
610 end_round: Round,
611 ) -> Option<VerifiedBlock> {
612 if start_round >= end_round {
613 return None;
614 }
615
616 let block_ref = self.recent_refs_by_authority[authority]
617 .range((
618 Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
619 Excluded(BlockRef::new(
620 end_round,
621 AuthorityIndex::MIN,
622 BlockDigest::MIN,
623 )),
624 ))
625 .last()?;
626
627 self.recent_blocks
628 .get(block_ref)
629 .map(|block_info| block_info.block.clone())
630 }
631
632 pub(crate) fn get_last_cached_block_per_authority(
639 &self,
640 end_round: Round,
641 ) -> Vec<(VerifiedBlock, Vec<BlockRef>)> {
642 let mut blocks = self.genesis.values().cloned().collect::<Vec<_>>();
644 let mut equivocating_blocks = vec![vec![]; self.context.committee.size()];
645
646 if end_round == GENESIS_ROUND {
647 panic!(
648 "Attempted to retrieve blocks earlier than the genesis round which is not possible"
649 );
650 }
651
652 if end_round == GENESIS_ROUND + 1 {
653 return blocks.into_iter().map(|b| (b, vec![])).collect();
654 }
655
656 for (authority_index, block_refs) in self.recent_refs_by_authority.iter().enumerate() {
657 let authority_index = self
658 .context
659 .committee
660 .to_authority_index(authority_index)
661 .unwrap();
662
663 let last_evicted_round = self.evicted_rounds[authority_index];
664 if end_round.saturating_sub(1) <= last_evicted_round {
665 panic!(
666 "Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}",
667 );
668 }
669
670 let block_ref_iter = block_refs
671 .range((
672 Included(BlockRef::new(
673 last_evicted_round + 1,
674 authority_index,
675 BlockDigest::MIN,
676 )),
677 Excluded(BlockRef::new(end_round, authority_index, BlockDigest::MIN)),
678 ))
679 .rev();
680
681 let mut last_round = 0;
682 for block_ref in block_ref_iter {
683 if last_round == 0 {
684 last_round = block_ref.round;
685 let block_info = self
686 .recent_blocks
687 .get(block_ref)
688 .expect("Block should exist in recent blocks");
689 blocks[authority_index] = block_info.block.clone();
690 continue;
691 }
692 if block_ref.round < last_round {
693 break;
694 }
695 equivocating_blocks[authority_index].push(*block_ref);
696 }
697 }
698
699 blocks.into_iter().zip(equivocating_blocks).collect()
700 }
701
702 pub(crate) fn contains_cached_block_at_slot(&self, slot: Slot) -> bool {
705 if slot.round == GENESIS_ROUND {
707 return true;
708 }
709
710 let eviction_round = self.evicted_rounds[slot.authority];
711 if slot.round <= eviction_round {
712 panic!(
713 "{}",
714 format!(
715 "Attempted to check for slot {slot} that is <= the last evicted round {eviction_round}"
716 )
717 );
718 }
719
720 let mut result = self.recent_refs_by_authority[slot.authority].range((
721 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
722 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
723 ));
724 result.next().is_some()
725 }
726
727 pub(crate) fn contains_blocks(&self, block_refs: Vec<BlockRef>) -> Vec<bool> {
730 let mut exist = vec![false; block_refs.len()];
731 let mut missing = Vec::new();
732
733 for (index, block_ref) in block_refs.into_iter().enumerate() {
734 let recent_refs = &self.recent_refs_by_authority[block_ref.author];
735 if recent_refs.contains(&block_ref) || self.genesis.contains_key(&block_ref) {
736 exist[index] = true;
737 } else if recent_refs.is_empty() || recent_refs.last().unwrap().round < block_ref.round
738 {
739 exist[index] = false;
743 } else {
744 missing.push((index, block_ref));
745 }
746 }
747
748 if missing.is_empty() {
749 return exist;
750 }
751
752 let missing_refs = missing
753 .iter()
754 .map(|(_, block_ref)| *block_ref)
755 .collect::<Vec<_>>();
756 let store_results = self
757 .store
758 .contains_blocks(&missing_refs)
759 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
760 self.context
761 .metrics
762 .node_metrics
763 .dag_state_store_read_count
764 .with_label_values(&["contains_blocks"])
765 .inc();
766
767 for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
768 exist[index] = result;
769 }
770
771 exist
772 }
773
774 pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool {
775 let blocks = self.contains_blocks(vec![*block_ref]);
776 blocks.first().cloned().unwrap()
777 }
778
779 pub(crate) fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
782 if let Some(block_info) = self.recent_blocks.get_mut(block_ref) {
783 if !block_info.committed {
784 block_info.committed = true;
785 return true;
786 }
787 false
788 } else {
789 panic!(
790 "Block {:?} not found in cache to set as committed.",
791 block_ref
792 );
793 }
794 }
795
796 pub(crate) fn is_committed(&self, block_ref: &BlockRef) -> bool {
798 self.recent_blocks
799 .get(block_ref)
800 .unwrap_or_else(|| panic!("Attempted to query for commit status for a block not in cached data {block_ref}"))
801 .committed
802 }
803
804 pub(crate) fn link_causal_history(&mut self, root_block: BlockRef) -> Vec<BlockRef> {
810 let gc_round = self.gc_round();
811 let mut linked_blocks = vec![];
812 let mut targets = VecDeque::new();
813 targets.push_back(root_block);
814 while let Some(block_ref) = targets.pop_front() {
815 if block_ref.round <= gc_round {
823 continue;
824 }
825 let block_info = self
826 .recent_blocks
827 .get_mut(&block_ref)
828 .unwrap_or_else(|| panic!("Block {:?} is not in DAG state", block_ref));
829 if block_info.included {
830 continue;
831 }
832 linked_blocks.push(block_ref);
833 block_info.included = true;
834 targets.extend(block_info.block.ancestors().iter());
835 }
836 linked_blocks
837 }
838
839 pub(crate) fn has_been_included(&self, block_ref: &BlockRef) -> bool {
842 self.recent_blocks
843 .get(block_ref)
844 .unwrap_or_else(|| {
845 panic!(
846 "Attempted to query for inclusion status for a block not in cached data {}",
847 block_ref
848 )
849 })
850 .included
851 }
852
853 pub(crate) fn threshold_clock_round(&self) -> Round {
854 self.threshold_clock.get_round()
855 }
856
857 pub(crate) fn threshold_clock_quorum_ts(&self) -> Instant {
859 self.threshold_clock.get_quorum_ts()
860 }
861
862 pub(crate) fn highest_accepted_round(&self) -> Round {
863 self.highest_accepted_round
864 }
865
866 pub(crate) fn add_commit(&mut self, commit: TrustedCommit) {
869 let time_diff = if let Some(last_commit) = &self.last_commit {
870 if commit.index() <= last_commit.index() {
871 error!(
872 "New commit index {} <= last commit index {}!",
873 commit.index(),
874 last_commit.index()
875 );
876 return;
877 }
878 assert_eq!(commit.index(), last_commit.index() + 1);
879
880 if commit.timestamp_ms() < last_commit.timestamp_ms() {
881 panic!(
882 "Commit timestamps do not monotonically increment, prev commit {:?}, new commit {:?}",
883 last_commit, commit
884 );
885 }
886 commit
887 .timestamp_ms()
888 .saturating_sub(last_commit.timestamp_ms())
889 } else {
890 assert_eq!(commit.index(), 1);
891 0
892 };
893
894 self.context
895 .metrics
896 .node_metrics
897 .last_commit_time_diff
898 .observe(time_diff as f64);
899
900 let commit_round_advanced = if let Some(previous_commit) = &self.last_commit {
901 previous_commit.round() < commit.round()
902 } else {
903 true
904 };
905
906 self.last_commit = Some(commit.clone());
907
908 if commit_round_advanced {
909 let now = std::time::Instant::now();
910 if let Some(previous_time) = self.last_commit_round_advancement_time {
911 self.context
912 .metrics
913 .node_metrics
914 .commit_round_advancement_interval
915 .observe(now.duration_since(previous_time).as_secs_f64())
916 }
917 self.last_commit_round_advancement_time = Some(now);
918 }
919
920 for block_ref in commit.blocks().iter() {
921 self.last_committed_rounds[block_ref.author] = max(
922 self.last_committed_rounds[block_ref.author],
923 block_ref.round,
924 );
925 }
926
927 for (i, round) in self.last_committed_rounds.iter().enumerate() {
928 let index = self.context.committee.to_authority_index(i).unwrap();
929 let hostname = &self.context.committee.authority(index).hostname;
930 self.context
931 .metrics
932 .node_metrics
933 .last_committed_authority_round
934 .with_label_values(&[hostname])
935 .set((*round).into());
936 }
937
938 self.pending_commit_votes.push_back(commit.reference());
939 self.commits_to_write.push(commit);
940 }
941
942 pub(crate) fn recover_commits_to_write(&mut self, commits: Vec<TrustedCommit>) {
944 self.commits_to_write.extend(commits);
945 }
946
947 pub(crate) fn ensure_commits_to_write_is_empty(&self) {
948 assert!(
949 self.commits_to_write.is_empty(),
950 "Commits to write should be empty. {:?}",
951 self.commits_to_write,
952 );
953 }
954
955 pub(crate) fn add_commit_info(&mut self, reputation_scores: ReputationScores) {
956 assert!(self.scoring_subdag.is_empty());
960
961 let commit_info = CommitInfo {
962 committed_rounds: self.last_committed_rounds.clone(),
963 reputation_scores,
964 };
965 let last_commit = self
966 .last_commit
967 .as_ref()
968 .expect("Last commit should already be set.");
969 self.commit_info_to_write
970 .push((last_commit.reference(), commit_info));
971 }
972
973 pub(crate) fn add_finalized_commit(
974 &mut self,
975 commit_ref: CommitRef,
976 rejected_transactions: BTreeMap<BlockRef, Vec<TransactionIndex>>,
977 ) {
978 self.finalized_commits_to_write
979 .push((commit_ref, rejected_transactions));
980 }
981
982 pub(crate) fn take_commit_votes(&mut self, limit: usize) -> Vec<CommitVote> {
983 let mut votes = Vec::new();
984 while !self.pending_commit_votes.is_empty() && votes.len() < limit {
985 votes.push(self.pending_commit_votes.pop_front().unwrap());
986 }
987 votes
988 }
989
990 pub(crate) fn last_commit_index(&self) -> CommitIndex {
992 match &self.last_commit {
993 Some(commit) => commit.index(),
994 None => 0,
995 }
996 }
997
998 pub(crate) fn last_commit_digest(&self) -> CommitDigest {
1000 match &self.last_commit {
1001 Some(commit) => commit.digest(),
1002 None => CommitDigest::MIN,
1003 }
1004 }
1005
1006 pub(crate) fn last_commit_timestamp_ms(&self) -> BlockTimestampMs {
1008 match &self.last_commit {
1009 Some(commit) => commit.timestamp_ms(),
1010 None => 0,
1011 }
1012 }
1013
1014 pub(crate) fn last_commit_leader(&self) -> Slot {
1016 match &self.last_commit {
1017 Some(commit) => commit.leader().into(),
1018 None => self
1019 .genesis
1020 .iter()
1021 .next()
1022 .map(|(genesis_ref, _)| *genesis_ref)
1023 .expect("Genesis blocks should always be available.")
1024 .into(),
1025 }
1026 }
1027
1028 pub(crate) fn last_commit_round(&self) -> Round {
1030 match &self.last_commit {
1031 Some(commit) => commit.leader().round,
1032 None => 0,
1033 }
1034 }
1035
1036 pub(crate) fn last_committed_rounds(&self) -> Vec<Round> {
1038 self.last_committed_rounds.clone()
1039 }
1040
1041 pub(crate) fn gc_round(&self) -> Round {
1045 self.calculate_gc_round(self.last_commit_round())
1046 }
1047
1048 pub(crate) fn calculate_gc_round(&self, commit_round: Round) -> Round {
1051 commit_round.saturating_sub(self.context.protocol_config.gc_depth())
1052 }
1053
1054 pub(crate) fn flush(&mut self) {
1064 let _s = self
1065 .context
1066 .metrics
1067 .node_metrics
1068 .scope_processing_time
1069 .with_label_values(&["DagState::flush"])
1070 .start_timer();
1071
1072 let pending_blocks = std::mem::take(&mut self.blocks_to_write);
1074 let pending_commits = std::mem::take(&mut self.commits_to_write);
1075 let pending_commit_info = std::mem::take(&mut self.commit_info_to_write);
1076 let pending_finalized_commits = std::mem::take(&mut self.finalized_commits_to_write);
1077 if pending_blocks.is_empty()
1078 && pending_commits.is_empty()
1079 && pending_commit_info.is_empty()
1080 && pending_finalized_commits.is_empty()
1081 {
1082 return;
1083 }
1084
1085 debug!(
1086 "Flushing {} blocks ({}), {} commits ({}), {} commit infos ({}), {} finalized commits ({}) to storage.",
1087 pending_blocks.len(),
1088 pending_blocks
1089 .iter()
1090 .map(|b| b.reference().to_string())
1091 .join(","),
1092 pending_commits.len(),
1093 pending_commits
1094 .iter()
1095 .map(|c| c.reference().to_string())
1096 .join(","),
1097 pending_commit_info.len(),
1098 pending_commit_info
1099 .iter()
1100 .map(|(commit_ref, _)| commit_ref.to_string())
1101 .join(","),
1102 pending_finalized_commits.len(),
1103 pending_finalized_commits
1104 .iter()
1105 .map(|(commit_ref, _)| commit_ref.to_string())
1106 .join(","),
1107 );
1108 self.store
1109 .write(WriteBatch::new(
1110 pending_blocks,
1111 pending_commits,
1112 pending_commit_info,
1113 pending_finalized_commits,
1114 ))
1115 .unwrap_or_else(|e| panic!("Failed to write to storage: {:?}", e));
1116 self.context
1117 .metrics
1118 .node_metrics
1119 .dag_state_store_write_count
1120 .inc();
1121
1122 for (authority_index, _) in self.context.committee.authorities() {
1124 let eviction_round = self.calculate_authority_eviction_round(authority_index);
1125 while let Some(block_ref) = self.recent_refs_by_authority[authority_index].first() {
1126 if block_ref.round <= eviction_round {
1127 self.recent_blocks.remove(block_ref);
1128 self.recent_refs_by_authority[authority_index].pop_first();
1129 } else {
1130 break;
1131 }
1132 }
1133 self.evicted_rounds[authority_index] = eviction_round;
1134 }
1135
1136 let metrics = &self.context.metrics.node_metrics;
1137 metrics
1138 .dag_state_recent_blocks
1139 .set(self.recent_blocks.len() as i64);
1140 metrics.dag_state_recent_refs.set(
1141 self.recent_refs_by_authority
1142 .iter()
1143 .map(BTreeSet::len)
1144 .sum::<usize>() as i64,
1145 );
1146 }
1147
1148 pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> {
1149 self.store
1150 .read_last_commit_info()
1151 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
1152 }
1153
1154 pub(crate) fn add_scoring_subdags(&mut self, scoring_subdags: Vec<CommittedSubDag>) {
1155 self.scoring_subdag.add_subdags(scoring_subdags);
1156 }
1157
1158 pub(crate) fn clear_scoring_subdag(&mut self) {
1159 self.scoring_subdag.clear();
1160 }
1161
1162 pub(crate) fn scoring_subdags_count(&self) -> usize {
1163 self.scoring_subdag.scored_subdags_count()
1164 }
1165
1166 pub(crate) fn is_scoring_subdag_empty(&self) -> bool {
1167 self.scoring_subdag.is_empty()
1168 }
1169
1170 pub(crate) fn calculate_scoring_subdag_scores(&self) -> ReputationScores {
1171 self.scoring_subdag.calculate_distributed_vote_scores()
1172 }
1173
1174 pub(crate) fn scoring_subdag_commit_range(&self) -> CommitIndex {
1175 self.scoring_subdag
1176 .commit_range
1177 .as_ref()
1178 .expect("commit range should exist for scoring subdag")
1179 .end()
1180 }
1181
1182 fn calculate_authority_eviction_round(&self, authority_index: AuthorityIndex) -> Round {
1186 let last_round = self.recent_refs_by_authority[authority_index]
1187 .last()
1188 .map(|block_ref| block_ref.round)
1189 .unwrap_or(GENESIS_ROUND);
1190
1191 Self::eviction_round(last_round, self.gc_round(), self.cached_rounds)
1192 }
1193
1194 fn eviction_round(last_round: Round, gc_round: Round, cached_rounds: u32) -> Round {
1197 gc_round.min(last_round.saturating_sub(cached_rounds))
1198 }
1199
1200 pub(crate) fn store(&self) -> Arc<dyn Store> {
1202 self.store.clone()
1203 }
1204
1205 #[cfg(test)]
1208 pub(crate) fn last_quorum(&self) -> Vec<VerifiedBlock> {
1209 for round in
1212 (self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev()
1213 {
1214 if round == GENESIS_ROUND {
1215 return self.genesis_blocks();
1216 }
1217 use crate::stake_aggregator::{QuorumThreshold, StakeAggregator};
1218 let mut quorum = StakeAggregator::<QuorumThreshold>::new();
1219
1220 let blocks = self.get_uncommitted_blocks_at_round(round);
1222 for block in &blocks {
1223 if quorum.add(block.author(), &self.context.committee) {
1224 return blocks;
1225 }
1226 }
1227 }
1228
1229 panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds.");
1230 }
1231
1232 #[cfg(test)]
1233 pub(crate) fn genesis_blocks(&self) -> Vec<VerifiedBlock> {
1234 self.genesis.values().cloned().collect()
1235 }
1236
1237 #[cfg(test)]
1238 pub(crate) fn set_last_commit(&mut self, commit: TrustedCommit) {
1239 self.last_commit = Some(commit);
1240 }
1241}
1242
1243struct BlockInfo {
1244 block: VerifiedBlock,
1245 committed: bool,
1247 included: bool,
1254}
1255
1256impl BlockInfo {
1257 fn new(block: VerifiedBlock) -> Self {
1258 Self {
1259 block,
1260 committed: false,
1261 included: false,
1262 }
1263 }
1264}
1265
1266#[cfg(test)]
1267mod test {
1268 use std::vec;
1269
1270 use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs};
1271 use parking_lot::RwLock;
1272
1273 use super::*;
1274 use crate::{
1275 block::{TestBlock, VerifiedBlock},
1276 storage::{WriteBatch, mem_store::MemStore},
1277 test_dag_builder::DagBuilder,
1278 test_dag_parser::parse_dag,
1279 };
1280
1281 #[tokio::test]
1282 async fn test_get_blocks() {
1283 let (context, _) = Context::new_for_test(4);
1284 let context = Arc::new(context);
1285 let store = Arc::new(MemStore::new());
1286 let mut dag_state = DagState::new(context.clone(), store.clone());
1287 let own_index = AuthorityIndex::new_for_test(0);
1288
1289 let num_rounds: u32 = 10;
1291 let non_existent_round: u32 = 100;
1292 let num_authorities: u32 = 3;
1293 let num_blocks_per_slot: usize = 3;
1294 let mut blocks = BTreeMap::new();
1295 for round in 1..=num_rounds {
1296 for author in 0..num_authorities {
1297 let base_ts = round as BlockTimestampMs * 1000;
1299 for timestamp in base_ts..base_ts + num_blocks_per_slot as u64 {
1300 let block = VerifiedBlock::new_for_test(
1301 TestBlock::new(round, author)
1302 .set_timestamp_ms(timestamp)
1303 .build(),
1304 );
1305 dag_state.accept_block(block.clone());
1306 blocks.insert(block.reference(), block);
1307
1308 if AuthorityIndex::new_for_test(author) == own_index {
1310 break;
1311 }
1312 }
1313 }
1314 }
1315
1316 for (r, block) in &blocks {
1318 assert_eq!(&dag_state.get_block(r).unwrap(), block);
1319 }
1320
1321 let last_ref = blocks.keys().last().unwrap();
1323 assert!(
1324 dag_state
1325 .get_block(&BlockRef::new(
1326 last_ref.round,
1327 last_ref.author,
1328 BlockDigest::MIN
1329 ))
1330 .is_none()
1331 );
1332
1333 for round in 1..=num_rounds {
1335 for author in 0..num_authorities {
1336 let slot = Slot::new(
1337 round,
1338 context
1339 .committee
1340 .to_authority_index(author as usize)
1341 .unwrap(),
1342 );
1343 let blocks = dag_state.get_uncommitted_blocks_at_slot(slot);
1344
1345 if AuthorityIndex::new_for_test(author) == own_index {
1347 assert_eq!(blocks.len(), 1);
1348 } else {
1349 assert_eq!(blocks.len(), num_blocks_per_slot);
1350 }
1351
1352 for b in blocks {
1353 assert_eq!(b.round(), round);
1354 assert_eq!(
1355 b.author(),
1356 context
1357 .committee
1358 .to_authority_index(author as usize)
1359 .unwrap()
1360 );
1361 }
1362 }
1363 }
1364
1365 let slot = Slot::new(non_existent_round, AuthorityIndex::ZERO);
1367 assert!(dag_state.get_uncommitted_blocks_at_slot(slot).is_empty());
1368
1369 for round in 1..=num_rounds {
1371 let blocks = dag_state.get_uncommitted_blocks_at_round(round);
1372 assert_eq!(
1375 blocks.len(),
1376 (num_authorities - 1) as usize * num_blocks_per_slot + 1
1377 );
1378 for b in blocks {
1379 assert_eq!(b.round(), round);
1380 }
1381 }
1382
1383 assert!(
1385 dag_state
1386 .get_uncommitted_blocks_at_round(non_existent_round)
1387 .is_empty()
1388 );
1389 }
1390
1391 #[tokio::test]
1392 async fn test_ancestors_at_uncommitted_round() {
1393 let (context, _) = Context::new_for_test(4);
1395 let context = Arc::new(context);
1396 let store = Arc::new(MemStore::new());
1397 let mut dag_state = DagState::new(context.clone(), store.clone());
1398
1399 let round_10_refs: Vec<_> = (0..4)
1403 .map(|a| {
1404 VerifiedBlock::new_for_test(TestBlock::new(10, a).set_timestamp_ms(1000).build())
1405 .reference()
1406 })
1407 .collect();
1408
1409 let round_11 = [
1411 VerifiedBlock::new_for_test(
1413 TestBlock::new(11, 0)
1414 .set_timestamp_ms(1100)
1415 .set_ancestors(round_10_refs.clone())
1416 .build(),
1417 ),
1418 VerifiedBlock::new_for_test(
1421 TestBlock::new(11, 1)
1422 .set_timestamp_ms(1110)
1423 .set_ancestors(round_10_refs.clone())
1424 .build(),
1425 ),
1426 VerifiedBlock::new_for_test(
1428 TestBlock::new(11, 1)
1429 .set_timestamp_ms(1111)
1430 .set_ancestors(round_10_refs.clone())
1431 .build(),
1432 ),
1433 VerifiedBlock::new_for_test(
1435 TestBlock::new(11, 1)
1436 .set_timestamp_ms(1112)
1437 .set_ancestors(round_10_refs.clone())
1438 .build(),
1439 ),
1440 VerifiedBlock::new_for_test(
1442 TestBlock::new(11, 2)
1443 .set_timestamp_ms(1120)
1444 .set_ancestors(round_10_refs.clone())
1445 .build(),
1446 ),
1447 VerifiedBlock::new_for_test(
1449 TestBlock::new(11, 3)
1450 .set_timestamp_ms(1130)
1451 .set_ancestors(round_10_refs.clone())
1452 .build(),
1453 ),
1454 ];
1455
1456 let ancestors_for_round_12 = vec![
1458 round_11[0].reference(),
1459 round_11[1].reference(),
1460 round_11[5].reference(),
1461 ];
1462 let round_12 = [
1463 VerifiedBlock::new_for_test(
1464 TestBlock::new(12, 0)
1465 .set_timestamp_ms(1200)
1466 .set_ancestors(ancestors_for_round_12.clone())
1467 .build(),
1468 ),
1469 VerifiedBlock::new_for_test(
1470 TestBlock::new(12, 2)
1471 .set_timestamp_ms(1220)
1472 .set_ancestors(ancestors_for_round_12.clone())
1473 .build(),
1474 ),
1475 VerifiedBlock::new_for_test(
1476 TestBlock::new(12, 3)
1477 .set_timestamp_ms(1230)
1478 .set_ancestors(ancestors_for_round_12.clone())
1479 .build(),
1480 ),
1481 ];
1482
1483 let ancestors_for_round_13 = vec![
1485 round_12[0].reference(),
1486 round_12[1].reference(),
1487 round_12[2].reference(),
1488 round_11[2].reference(),
1489 ];
1490 let round_13 = [
1491 VerifiedBlock::new_for_test(
1492 TestBlock::new(12, 1)
1493 .set_timestamp_ms(1300)
1494 .set_ancestors(ancestors_for_round_13.clone())
1495 .build(),
1496 ),
1497 VerifiedBlock::new_for_test(
1498 TestBlock::new(12, 2)
1499 .set_timestamp_ms(1320)
1500 .set_ancestors(ancestors_for_round_13.clone())
1501 .build(),
1502 ),
1503 VerifiedBlock::new_for_test(
1504 TestBlock::new(12, 3)
1505 .set_timestamp_ms(1330)
1506 .set_ancestors(ancestors_for_round_13.clone())
1507 .build(),
1508 ),
1509 ];
1510
1511 let ancestors_for_round_14 = round_13.iter().map(|b| b.reference()).collect();
1513 let anchor = VerifiedBlock::new_for_test(
1514 TestBlock::new(14, 1)
1515 .set_timestamp_ms(1410)
1516 .set_ancestors(ancestors_for_round_14)
1517 .build(),
1518 );
1519
1520 for b in round_11
1522 .iter()
1523 .chain(round_12.iter())
1524 .chain(round_13.iter())
1525 .chain([anchor.clone()].iter())
1526 {
1527 dag_state.accept_block(b.clone());
1528 }
1529
1530 let ancestors = dag_state.ancestors_at_round(&anchor, 11);
1532 let mut ancestors_refs: Vec<BlockRef> = ancestors.iter().map(|b| b.reference()).collect();
1533 ancestors_refs.sort();
1534 let mut expected_refs = vec![
1535 round_11[0].reference(),
1536 round_11[1].reference(),
1537 round_11[2].reference(),
1538 round_11[5].reference(),
1539 ];
1540 expected_refs.sort(); assert_eq!(
1542 ancestors_refs, expected_refs,
1543 "Expected round 11 ancestors: {:?}. Got: {:?}",
1544 expected_refs, ancestors_refs
1545 );
1546 }
1547
1548 #[tokio::test]
1549 async fn test_link_causal_history() {
1550 let (mut context, _) = Context::new_for_test(4);
1551 context.parameters.dag_state_cached_rounds = 10;
1552 context
1553 .protocol_config
1554 .set_consensus_gc_depth_for_testing(3);
1555 let context = Arc::new(context);
1556
1557 let store = Arc::new(MemStore::new());
1558 let mut dag_state = DagState::new(context.clone(), store.clone());
1559
1560 let mut dag_builder = DagBuilder::new(context.clone());
1562 dag_builder.layers(1..=3).build();
1563 dag_builder
1564 .layers(4..=6)
1565 .authorities(vec![AuthorityIndex::new_for_test(0)])
1566 .skip_block()
1567 .build();
1568
1569 let all_blocks = dag_builder.all_blocks();
1571 dag_state.accept_blocks(all_blocks.clone());
1572
1573 for block in &all_blocks {
1575 assert!(!dag_state.has_been_included(&block.reference()));
1576 }
1577
1578 let round_1_block = &all_blocks[1];
1580 assert_eq!(round_1_block.round(), 1);
1581 let linked_blocks = dag_state.link_causal_history(round_1_block.reference());
1582
1583 assert_eq!(linked_blocks.len(), 1);
1585 assert_eq!(linked_blocks[0], round_1_block.reference());
1586 for block_ref in linked_blocks {
1587 assert!(dag_state.has_been_included(&block_ref));
1588 }
1589
1590 let round_2_block = &all_blocks[4];
1592 assert_eq!(round_2_block.round(), 2);
1593 let linked_blocks = dag_state.link_causal_history(round_2_block.reference());
1594
1595 assert_eq!(linked_blocks.len(), 4);
1597 for block_ref in linked_blocks {
1598 assert!(block_ref == round_2_block.reference() || block_ref.round == 1);
1599 }
1600
1601 for block in &all_blocks {
1603 if block.round() == 1 || block.reference() == round_2_block.reference() {
1604 assert!(dag_state.has_been_included(&block.reference()));
1605 } else {
1606 assert!(!dag_state.has_been_included(&block.reference()));
1607 }
1608 }
1609
1610 let round_6_block = all_blocks.last().unwrap();
1612 assert_eq!(round_6_block.round(), 6);
1613
1614 let last_commit = TrustedCommit::new_for_test(
1616 6,
1617 CommitDigest::MIN,
1618 context.clock.timestamp_utc_ms(),
1619 round_6_block.reference(),
1620 vec![],
1621 );
1622 dag_state.set_last_commit(last_commit);
1623 assert_eq!(
1624 dag_state.gc_round(),
1625 3,
1626 "GC round should have moved to round 3"
1627 );
1628
1629 let linked_blocks = dag_state.link_causal_history(round_6_block.reference());
1631
1632 assert_eq!(linked_blocks.len(), 7, "Linked blocks: {:?}", linked_blocks);
1634 for block_ref in linked_blocks {
1635 assert!(
1636 block_ref.round == 4
1637 || block_ref.round == 5
1638 || block_ref == round_6_block.reference()
1639 );
1640 }
1641
1642 for block in &all_blocks {
1644 let block_ref = block.reference();
1645 if block.round() == 1
1646 || block_ref == round_2_block.reference()
1647 || block_ref.round == 4
1648 || block_ref.round == 5
1649 || block_ref == round_6_block.reference()
1650 {
1651 assert!(dag_state.has_been_included(&block.reference()));
1652 } else {
1653 assert!(!dag_state.has_been_included(&block.reference()));
1654 }
1655 }
1656 }
1657
1658 #[tokio::test]
1659 async fn test_contains_blocks_in_cache_or_store() {
1660 const CACHED_ROUNDS: Round = 2;
1662
1663 let (mut context, _) = Context::new_for_test(4);
1664 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1665
1666 let context = Arc::new(context);
1667 let store = Arc::new(MemStore::new());
1668 let mut dag_state = DagState::new(context.clone(), store.clone());
1669
1670 let num_rounds: u32 = 10;
1672 let num_authorities: u32 = 4;
1673 let mut blocks = Vec::new();
1674
1675 for round in 1..=num_rounds {
1676 for author in 0..num_authorities {
1677 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1678 blocks.push(block);
1679 }
1680 }
1681
1682 blocks.clone().into_iter().for_each(|block| {
1684 if block.round() <= 4 {
1685 store
1686 .write(WriteBatch::default().blocks(vec![block]))
1687 .unwrap();
1688 } else {
1689 dag_state.accept_blocks(vec![block]);
1690 }
1691 });
1692
1693 let mut block_refs = blocks
1696 .iter()
1697 .map(|block| block.reference())
1698 .collect::<Vec<_>>();
1699 let result = dag_state.contains_blocks(block_refs.clone());
1700
1701 let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1703 assert_eq!(result, expected);
1704
1705 block_refs.insert(
1707 3,
1708 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1709 );
1710 let result = dag_state.contains_blocks(block_refs.clone());
1711
1712 expected.insert(3, false);
1714 assert_eq!(result, expected.clone());
1715 }
1716
1717 #[tokio::test]
1718 async fn test_contains_cached_block_at_slot() {
1719 const CACHED_ROUNDS: Round = 2;
1721
1722 let num_authorities: u32 = 4;
1723 let (mut context, _) = Context::new_for_test(num_authorities as usize);
1724 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1725
1726 let context = Arc::new(context);
1727 let store = Arc::new(MemStore::new());
1728 let mut dag_state = DagState::new(context.clone(), store.clone());
1729
1730 let num_rounds: u32 = 10;
1732 let mut blocks = Vec::new();
1733
1734 for round in 1..=num_rounds {
1735 for author in 0..num_authorities {
1736 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1737 blocks.push(block.clone());
1738 dag_state.accept_block(block);
1739 }
1740 }
1741
1742 for (author, _) in context.committee.authorities() {
1744 assert!(
1745 dag_state.contains_cached_block_at_slot(Slot::new(GENESIS_ROUND, author)),
1746 "Genesis should always be found"
1747 );
1748 }
1749
1750 let mut block_refs = blocks
1753 .iter()
1754 .map(|block| block.reference())
1755 .collect::<Vec<_>>();
1756
1757 for block_ref in block_refs.clone() {
1758 let slot = block_ref.into();
1759 let found = dag_state.contains_cached_block_at_slot(slot);
1760 assert!(found, "A block should be found at slot {}", slot);
1761 }
1762
1763 block_refs.insert(
1766 3,
1767 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1768 );
1769 let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1770 expected.insert(3, false);
1771
1772 for block_ref in block_refs {
1774 let slot = block_ref.into();
1775 let found = dag_state.contains_cached_block_at_slot(slot);
1776
1777 assert_eq!(expected.remove(0), found);
1778 }
1779 }
1780
1781 #[tokio::test]
1782 #[ignore]
1783 #[should_panic(
1784 expected = "Attempted to check for slot [1]3 that is <= the last gc evicted round 3"
1785 )]
1786 async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range() {
1787 const GC_DEPTH: u32 = 2;
1790 const CACHED_ROUNDS: Round = 3;
1792
1793 let (mut context, _) = Context::new_for_test(4);
1794 context
1795 .protocol_config
1796 .set_consensus_gc_depth_for_testing(GC_DEPTH);
1797 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1798
1799 let context = Arc::new(context);
1800 let store = Arc::new(MemStore::new());
1801 let mut dag_state = DagState::new(context.clone(), store.clone());
1802
1803 let mut dag_builder = DagBuilder::new(context.clone());
1805 dag_builder.layers(1..=3).build();
1806 dag_builder
1807 .layers(4..=6)
1808 .authorities(vec![AuthorityIndex::new_for_test(0)])
1809 .skip_block()
1810 .build();
1811
1812 dag_builder
1814 .all_blocks()
1815 .into_iter()
1816 .for_each(|block| dag_state.accept_block(block));
1817
1818 dag_state.add_commit(TrustedCommit::new_for_test(
1820 1 as CommitIndex,
1821 CommitDigest::MIN,
1822 0,
1823 dag_builder.leader_block(5).unwrap().reference(),
1824 vec![],
1825 ));
1826 dag_state.flush();
1828
1829 assert_eq!(dag_state.gc_round(), 3, "GC round should be 3");
1831
1832 for authority_index in 1..=3 {
1836 for round in 4..=6 {
1837 assert!(dag_state.contains_cached_block_at_slot(Slot::new(
1838 round,
1839 AuthorityIndex::new_for_test(authority_index)
1840 )));
1841 }
1842 }
1843
1844 for round in 1..=3 {
1845 assert!(
1846 dag_state.contains_cached_block_at_slot(Slot::new(
1847 round,
1848 AuthorityIndex::new_for_test(0)
1849 ))
1850 );
1851 }
1852
1853 let _ =
1856 dag_state.contains_cached_block_at_slot(Slot::new(3, AuthorityIndex::new_for_test(1)));
1857 }
1858
1859 #[tokio::test]
1860 async fn test_get_blocks_in_cache_or_store() {
1861 let (context, _) = Context::new_for_test(4);
1862 let context = Arc::new(context);
1863 let store = Arc::new(MemStore::new());
1864 let mut dag_state = DagState::new(context.clone(), store.clone());
1865
1866 let num_rounds: u32 = 10;
1868 let num_authorities: u32 = 4;
1869 let mut blocks = Vec::new();
1870
1871 for round in 1..=num_rounds {
1872 for author in 0..num_authorities {
1873 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1874 blocks.push(block);
1875 }
1876 }
1877
1878 blocks.clone().into_iter().for_each(|block| {
1880 if block.round() <= 4 {
1881 store
1882 .write(WriteBatch::default().blocks(vec![block]))
1883 .unwrap();
1884 } else {
1885 dag_state.accept_blocks(vec![block]);
1886 }
1887 });
1888
1889 let mut block_refs = blocks
1892 .iter()
1893 .map(|block| block.reference())
1894 .collect::<Vec<_>>();
1895 let result = dag_state.get_blocks(&block_refs);
1896
1897 let mut expected = blocks
1898 .into_iter()
1899 .map(Some)
1900 .collect::<Vec<Option<VerifiedBlock>>>();
1901
1902 assert_eq!(result, expected.clone());
1904
1905 block_refs.insert(
1907 3,
1908 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1909 );
1910 let result = dag_state.get_blocks(&block_refs);
1911
1912 expected.insert(3, None);
1914 assert_eq!(result, expected);
1915 }
1916
1917 #[tokio::test]
1918 async fn test_flush_and_recovery() {
1919 telemetry_subscribers::init_for_testing();
1920
1921 const GC_DEPTH: u32 = 3;
1922 const CACHED_ROUNDS: u32 = 4;
1923
1924 let num_authorities: u32 = 4;
1925 let (mut context, _) = Context::new_for_test(num_authorities as usize);
1926 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1927 context
1928 .protocol_config
1929 .set_consensus_gc_depth_for_testing(GC_DEPTH);
1930
1931 let context = Arc::new(context);
1932
1933 let store = Arc::new(MemStore::new());
1934 let mut dag_state = DagState::new(context.clone(), store.clone());
1935
1936 const NUM_ROUNDS: Round = 20;
1937 let mut dag_builder = DagBuilder::new(context.clone());
1938 dag_builder.layers(1..=5).build();
1939 dag_builder
1940 .layers(6..=8)
1941 .authorities(vec![AuthorityIndex::new_for_test(0)])
1942 .skip_block()
1943 .build();
1944 dag_builder.layers(9..=NUM_ROUNDS).build();
1945
1946 const LAST_COMMIT_ROUND: Round = 16;
1948 const LAST_COMMIT_INDEX: CommitIndex = 15;
1949 let commits = dag_builder
1950 .get_sub_dag_and_commits(1..=NUM_ROUNDS)
1951 .into_iter()
1952 .map(|(_subdag, commit)| commit)
1953 .take(LAST_COMMIT_INDEX as usize)
1954 .collect::<Vec<_>>();
1955 assert_eq!(commits.len(), LAST_COMMIT_INDEX as usize);
1956 assert_eq!(commits.last().unwrap().round(), LAST_COMMIT_ROUND);
1957
1958 const PERSISTED_BLOCK_ROUNDS: u32 = 12;
1962 const NUM_PERSISTED_COMMITS: usize = 8;
1963 const LAST_PERSISTED_COMMIT_ROUND: Round = 9;
1964 const LAST_PERSISTED_COMMIT_INDEX: CommitIndex = 8;
1965 dag_state.accept_blocks(dag_builder.blocks(1..=PERSISTED_BLOCK_ROUNDS));
1966 let mut finalized_commits = vec![];
1967 for commit in commits.iter().take(NUM_PERSISTED_COMMITS).cloned() {
1968 finalized_commits.push(commit.clone());
1969 dag_state.add_commit(commit);
1970 }
1971 let last_finalized_commit = finalized_commits.last().unwrap();
1972 assert_eq!(last_finalized_commit.round(), LAST_PERSISTED_COMMIT_ROUND);
1973 assert_eq!(last_finalized_commit.index(), LAST_PERSISTED_COMMIT_INDEX);
1974
1975 let finalized_blocks = finalized_commits
1977 .iter()
1978 .flat_map(|commit| commit.blocks())
1979 .collect::<BTreeSet<_>>();
1980
1981 dag_state.flush();
1983
1984 let store_blocks = store
1986 .scan_blocks_by_author(AuthorityIndex::new_for_test(1), 1)
1987 .unwrap();
1988 assert_eq!(store_blocks.last().unwrap().round(), PERSISTED_BLOCK_ROUNDS);
1989 let store_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1990 assert_eq!(store_commits.len(), NUM_PERSISTED_COMMITS);
1991 assert_eq!(
1992 store_commits.last().unwrap().index(),
1993 LAST_PERSISTED_COMMIT_INDEX
1994 );
1995 assert_eq!(
1996 store_commits.last().unwrap().round(),
1997 LAST_PERSISTED_COMMIT_ROUND
1998 );
1999
2000 dag_state.accept_blocks(dag_builder.blocks(PERSISTED_BLOCK_ROUNDS + 1..=NUM_ROUNDS));
2002 for commit in commits.iter().skip(NUM_PERSISTED_COMMITS).cloned() {
2003 dag_state.add_commit(commit);
2004 }
2005
2006 let all_blocks = dag_builder.blocks(1..=NUM_ROUNDS);
2008 let block_refs = all_blocks
2009 .iter()
2010 .map(|block| block.reference())
2011 .collect::<Vec<_>>();
2012 let result = dag_state
2013 .get_blocks(&block_refs)
2014 .into_iter()
2015 .map(|b| b.unwrap())
2016 .collect::<Vec<_>>();
2017 assert_eq!(result, all_blocks);
2018
2019 assert_eq!(dag_state.last_commit_index(), LAST_COMMIT_INDEX);
2021
2022 drop(dag_state);
2024
2025 let dag_state = DagState::new(context.clone(), store.clone());
2027
2028 let all_blocks = dag_builder.blocks(1..=PERSISTED_BLOCK_ROUNDS);
2030 let block_refs = all_blocks
2031 .iter()
2032 .map(|block| block.reference())
2033 .collect::<Vec<_>>();
2034 let result = dag_state
2035 .get_blocks(&block_refs)
2036 .into_iter()
2037 .map(|b| b.unwrap())
2038 .collect::<Vec<_>>();
2039 assert_eq!(result, all_blocks);
2040
2041 let missing_blocks = dag_builder.blocks(PERSISTED_BLOCK_ROUNDS + 1..=NUM_ROUNDS);
2043 let block_refs = missing_blocks
2044 .iter()
2045 .map(|block| block.reference())
2046 .collect::<Vec<_>>();
2047 let retrieved_blocks = dag_state
2048 .get_blocks(&block_refs)
2049 .into_iter()
2050 .flatten()
2051 .collect::<Vec<_>>();
2052 assert!(retrieved_blocks.is_empty());
2053
2054 assert_eq!(dag_state.last_commit_index(), LAST_PERSISTED_COMMIT_INDEX);
2056 assert_eq!(dag_state.last_commit_round(), LAST_PERSISTED_COMMIT_ROUND);
2057
2058 let expected_last_committed_rounds = vec![5, 9, 8, 8];
2060 assert_eq!(
2061 dag_state.last_committed_rounds(),
2062 expected_last_committed_rounds
2063 );
2064 assert_eq!(dag_state.scoring_subdags_count(), NUM_PERSISTED_COMMITS);
2066
2067 for (authority_index, _) in context.committee.authorities() {
2069 let blocks = dag_state.get_cached_blocks(authority_index, 1);
2070
2071 if authority_index == AuthorityIndex::new_for_test(0) {
2075 assert_eq!(blocks.len(), 4);
2076 assert_eq!(dag_state.evicted_rounds[authority_index.value()], 6);
2077 assert!(
2078 blocks
2079 .into_iter()
2080 .all(|block| block.round() >= 7 && block.round() <= 12)
2081 );
2082 } else {
2083 assert_eq!(blocks.len(), 6);
2084 assert_eq!(dag_state.evicted_rounds[authority_index.value()], 6);
2085 assert!(
2086 blocks
2087 .into_iter()
2088 .all(|block| block.round() >= 7 && block.round() <= 12)
2089 );
2090 }
2091 }
2092
2093 let gc_round = dag_state.gc_round();
2095 assert_eq!(gc_round, 6);
2096 dag_state
2097 .recent_blocks
2098 .iter()
2099 .for_each(|(block_ref, block_info)| {
2100 if block_ref.round > gc_round && finalized_blocks.contains(block_ref) {
2101 assert!(
2102 block_info.committed,
2103 "Block {:?} should be set as committed",
2104 block_ref
2105 );
2106 }
2107 });
2108
2109 dag_state
2114 .recent_blocks
2115 .iter()
2116 .for_each(|(block_ref, block_info)| {
2117 if block_ref.round < PERSISTED_BLOCK_ROUNDS || block_ref.author.value() == 0 {
2118 assert!(block_info.included);
2119 } else {
2120 assert!(!block_info.included);
2121 }
2122 });
2123 }
2124
2125 #[tokio::test]
2126 async fn test_block_info_as_committed() {
2127 let num_authorities: u32 = 4;
2128 let (context, _) = Context::new_for_test(num_authorities as usize);
2129 let context = Arc::new(context);
2130
2131 let store = Arc::new(MemStore::new());
2132 let mut dag_state = DagState::new(context.clone(), store.clone());
2133
2134 let block = VerifiedBlock::new_for_test(
2136 TestBlock::new(1, 0)
2137 .set_timestamp_ms(1000)
2138 .set_ancestors(vec![])
2139 .build(),
2140 );
2141
2142 dag_state.accept_block(block.clone());
2143
2144 assert!(!dag_state.is_committed(&block.reference()));
2146
2147 assert!(
2149 dag_state.set_committed(&block.reference()),
2150 "Block should be successfully set as committed for first time"
2151 );
2152
2153 assert!(dag_state.is_committed(&block.reference()));
2155
2156 assert!(
2158 !dag_state.set_committed(&block.reference()),
2159 "Block should not be successfully set as committed"
2160 );
2161 }
2162
2163 #[tokio::test]
2164 async fn test_get_cached_blocks() {
2165 let (mut context, _) = Context::new_for_test(4);
2166 context.parameters.dag_state_cached_rounds = 5;
2167
2168 let context = Arc::new(context);
2169 let store = Arc::new(MemStore::new());
2170 let mut dag_state = DagState::new(context.clone(), store.clone());
2171
2172 let mut all_blocks = Vec::new();
2177 for author in 1..=3 {
2178 for round in 10..(10 + author) {
2179 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2180 all_blocks.push(block.clone());
2181 dag_state.accept_block(block);
2182 }
2183 }
2184
2185 let cached_blocks =
2188 dag_state.get_cached_blocks(context.committee.to_authority_index(0).unwrap(), 0);
2189 assert!(cached_blocks.is_empty());
2190
2191 let cached_blocks =
2192 dag_state.get_cached_blocks(context.committee.to_authority_index(1).unwrap(), 10);
2193 assert_eq!(cached_blocks.len(), 1);
2194 assert_eq!(cached_blocks[0].round(), 10);
2195
2196 let cached_blocks =
2197 dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 10);
2198 assert_eq!(cached_blocks.len(), 2);
2199 assert_eq!(cached_blocks[0].round(), 10);
2200 assert_eq!(cached_blocks[1].round(), 11);
2201
2202 let cached_blocks =
2203 dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 11);
2204 assert_eq!(cached_blocks.len(), 1);
2205 assert_eq!(cached_blocks[0].round(), 11);
2206
2207 let cached_blocks =
2208 dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 10);
2209 assert_eq!(cached_blocks.len(), 3);
2210 assert_eq!(cached_blocks[0].round(), 10);
2211 assert_eq!(cached_blocks[1].round(), 11);
2212 assert_eq!(cached_blocks[2].round(), 12);
2213
2214 let cached_blocks =
2215 dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 12);
2216 assert_eq!(cached_blocks.len(), 1);
2217 assert_eq!(cached_blocks[0].round(), 12);
2218
2219 let cached_blocks = dag_state.get_cached_blocks_in_range(
2223 context.committee.to_authority_index(3).unwrap(),
2224 10,
2225 10,
2226 1,
2227 );
2228 assert!(cached_blocks.is_empty());
2229
2230 let cached_blocks = dag_state.get_cached_blocks_in_range(
2232 context.committee.to_authority_index(3).unwrap(),
2233 11,
2234 10,
2235 1,
2236 );
2237 assert!(cached_blocks.is_empty());
2238
2239 let cached_blocks = dag_state.get_cached_blocks_in_range(
2241 context.committee.to_authority_index(0).unwrap(),
2242 9,
2243 10,
2244 1,
2245 );
2246 assert!(cached_blocks.is_empty());
2247
2248 let cached_blocks = dag_state.get_cached_blocks_in_range(
2250 context.committee.to_authority_index(1).unwrap(),
2251 9,
2252 11,
2253 1,
2254 );
2255 assert_eq!(cached_blocks.len(), 1);
2256 assert_eq!(cached_blocks[0].round(), 10);
2257
2258 let cached_blocks = dag_state.get_cached_blocks_in_range(
2260 context.committee.to_authority_index(2).unwrap(),
2261 9,
2262 12,
2263 5,
2264 );
2265 assert_eq!(cached_blocks.len(), 2);
2266 assert_eq!(cached_blocks[0].round(), 10);
2267 assert_eq!(cached_blocks[1].round(), 11);
2268
2269 let cached_blocks = dag_state.get_cached_blocks_in_range(
2271 context.committee.to_authority_index(3).unwrap(),
2272 11,
2273 20,
2274 5,
2275 );
2276 assert_eq!(cached_blocks.len(), 2);
2277 assert_eq!(cached_blocks[0].round(), 11);
2278 assert_eq!(cached_blocks[1].round(), 12);
2279
2280 let cached_blocks = dag_state.get_cached_blocks_in_range(
2282 context.committee.to_authority_index(3).unwrap(),
2283 10,
2284 20,
2285 1,
2286 );
2287 assert_eq!(cached_blocks.len(), 1);
2288 assert_eq!(cached_blocks[0].round(), 10);
2289 }
2290
2291 #[tokio::test]
2292 async fn test_get_last_cached_block() {
2293 const CACHED_ROUNDS: Round = 2;
2295 const GC_DEPTH: u32 = 1;
2296 let (mut context, _) = Context::new_for_test(4);
2297 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2298 context
2299 .protocol_config
2300 .set_consensus_gc_depth_for_testing(GC_DEPTH);
2301
2302 let context = Arc::new(context);
2303 let store = Arc::new(MemStore::new());
2304 let mut dag_state = DagState::new(context.clone(), store.clone());
2305
2306 let dag_str = "DAG {
2311 Round 0 : { 4 },
2312 Round 1 : {
2313 B -> [*],
2314 C -> [*],
2315 D -> [*],
2316 },
2317 Round 2 : {
2318 C -> [*],
2319 D -> [*],
2320 },
2321 Round 3 : {
2322 D -> [*],
2323 },
2324 }";
2325
2326 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2327
2328 let block = VerifiedBlock::new_for_test(TestBlock::new(2, 2).build());
2330
2331 for block in dag_builder
2333 .all_blocks()
2334 .into_iter()
2335 .chain(std::iter::once(block))
2336 {
2337 dag_state.accept_block(block);
2338 }
2339
2340 dag_state.add_commit(TrustedCommit::new_for_test(
2341 1 as CommitIndex,
2342 CommitDigest::MIN,
2343 context.clock.timestamp_utc_ms(),
2344 dag_builder.leader_block(3).unwrap().reference(),
2345 vec![],
2346 ));
2347
2348 let end_round = 4;
2350 let expected_rounds = vec![0, 1, 2, 3];
2351 let expected_excluded_and_equivocating_blocks = vec![0, 0, 1, 0];
2352 let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2354 assert_eq!(
2355 last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2356 expected_rounds
2357 );
2358 assert_eq!(
2359 last_blocks.iter().map(|b| b.1.len()).collect::<Vec<_>>(),
2360 expected_excluded_and_equivocating_blocks
2361 );
2362
2363 for (i, expected_round) in expected_rounds.iter().enumerate() {
2365 let round = dag_state
2366 .get_last_cached_block_in_range(
2367 context.committee.to_authority_index(i).unwrap(),
2368 0,
2369 end_round,
2370 )
2371 .map(|b| b.round())
2372 .unwrap_or_default();
2373 assert_eq!(round, *expected_round, "Authority {i}");
2374 }
2375
2376 let start_round = 2;
2378 let expected_rounds = [0, 0, 2, 3];
2379
2380 for (i, expected_round) in expected_rounds.iter().enumerate() {
2382 let round = dag_state
2383 .get_last_cached_block_in_range(
2384 context.committee.to_authority_index(i).unwrap(),
2385 start_round,
2386 end_round,
2387 )
2388 .map(|b| b.round())
2389 .unwrap_or_default();
2390 assert_eq!(round, *expected_round, "Authority {i}");
2391 }
2392
2393 dag_state.flush();
2399
2400 let end_round = 3;
2402 let expected_rounds = vec![0, 1, 2, 2];
2403
2404 let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2406 assert_eq!(
2407 last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2408 expected_rounds
2409 );
2410
2411 for (i, expected_round) in expected_rounds.iter().enumerate() {
2413 let round = dag_state
2414 .get_last_cached_block_in_range(
2415 context.committee.to_authority_index(i).unwrap(),
2416 0,
2417 end_round,
2418 )
2419 .map(|b| b.round())
2420 .unwrap_or_default();
2421 assert_eq!(round, *expected_round, "Authority {i}");
2422 }
2423 }
2424
2425 #[tokio::test]
2426 #[should_panic(
2427 expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
2428 )]
2429 async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() {
2430 const CACHED_ROUNDS: Round = 1;
2432 const GC_DEPTH: u32 = 1;
2433 let (mut context, _) = Context::new_for_test(4);
2434 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2435 context
2436 .protocol_config
2437 .set_consensus_gc_depth_for_testing(GC_DEPTH);
2438
2439 let context = Arc::new(context);
2440 let store = Arc::new(MemStore::new());
2441 let mut dag_state = DagState::new(context.clone(), store.clone());
2442
2443 let mut dag_builder = DagBuilder::new(context.clone());
2448 dag_builder
2449 .layers(1..=1)
2450 .authorities(vec![AuthorityIndex::new_for_test(0)])
2451 .skip_block()
2452 .build();
2453 dag_builder
2454 .layers(2..=2)
2455 .authorities(vec![
2456 AuthorityIndex::new_for_test(0),
2457 AuthorityIndex::new_for_test(1),
2458 ])
2459 .skip_block()
2460 .build();
2461 dag_builder
2462 .layers(3..=3)
2463 .authorities(vec![
2464 AuthorityIndex::new_for_test(0),
2465 AuthorityIndex::new_for_test(1),
2466 AuthorityIndex::new_for_test(2),
2467 ])
2468 .skip_block()
2469 .build();
2470
2471 for block in dag_builder.all_blocks() {
2473 dag_state.accept_block(block);
2474 }
2475
2476 dag_state.add_commit(TrustedCommit::new_for_test(
2477 1 as CommitIndex,
2478 CommitDigest::MIN,
2479 0,
2480 dag_builder.leader_block(3).unwrap().reference(),
2481 vec![],
2482 ));
2483
2484 dag_state.flush();
2486
2487 dag_state.get_last_cached_block_per_authority(2);
2489 }
2490
2491 #[tokio::test]
2492 async fn test_last_quorum() {
2493 let (context, _) = Context::new_for_test(4);
2495 let context = Arc::new(context);
2496 let store = Arc::new(MemStore::new());
2497 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2498
2499 {
2501 let genesis = genesis_blocks(context.as_ref());
2502
2503 assert_eq!(dag_state.read().last_quorum(), genesis);
2504 }
2505
2506 {
2508 let mut dag_builder = DagBuilder::new(context.clone());
2509 dag_builder
2510 .layers(1..=4)
2511 .build()
2512 .persist_layers(dag_state.clone());
2513 let round_4_blocks: Vec<_> = dag_builder
2514 .blocks(4..=4)
2515 .into_iter()
2516 .map(|block| block.reference())
2517 .collect();
2518
2519 let last_quorum = dag_state.read().last_quorum();
2520
2521 assert_eq!(
2522 last_quorum
2523 .into_iter()
2524 .map(|block| block.reference())
2525 .collect::<Vec<_>>(),
2526 round_4_blocks
2527 );
2528 }
2529
2530 {
2532 let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2533 dag_state.write().accept_block(block);
2534
2535 let round_4_blocks = dag_state.read().get_uncommitted_blocks_at_round(4);
2536
2537 let last_quorum = dag_state.read().last_quorum();
2538
2539 assert_eq!(last_quorum, round_4_blocks);
2540 }
2541 }
2542
2543 #[tokio::test]
2544 async fn test_last_block_for_authority() {
2545 let (context, _) = Context::new_for_test(4);
2547 let context = Arc::new(context);
2548 let store = Arc::new(MemStore::new());
2549 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2550
2551 {
2553 let genesis = genesis_blocks(context.as_ref());
2554 let my_genesis = genesis
2555 .into_iter()
2556 .find(|block| block.author() == context.own_index)
2557 .unwrap();
2558
2559 assert_eq!(dag_state.read().get_last_proposed_block(), my_genesis);
2560 }
2561
2562 {
2564 let mut dag_builder = DagBuilder::new(context.clone());
2566 dag_builder
2567 .layers(1..=4)
2568 .build()
2569 .persist_layers(dag_state.clone());
2570
2571 let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2573 dag_state.write().accept_block(block);
2574
2575 let block = dag_state
2576 .read()
2577 .get_last_block_for_authority(AuthorityIndex::new_for_test(0));
2578 assert_eq!(block.round(), 5);
2579
2580 for (authority_index, _) in context.committee.authorities() {
2581 let block = dag_state
2582 .read()
2583 .get_last_block_for_authority(authority_index);
2584
2585 if authority_index.value() == 0 {
2586 assert_eq!(block.round(), 5);
2587 } else {
2588 assert_eq!(block.round(), 4);
2589 }
2590 }
2591 }
2592 }
2593
2594 #[tokio::test]
2595 async fn test_accept_block_not_panics_when_timestamp_is_ahead_and_median_timestamp() {
2596 let (context, _) = Context::new_for_test(4);
2598 let context = Arc::new(context);
2599 let store = Arc::new(MemStore::new());
2600 let mut dag_state = DagState::new(context.clone(), store.clone());
2601
2602 let block_timestamp = context.clock.timestamp_utc_ms() + 5_000;
2604
2605 let block = VerifiedBlock::new_for_test(
2606 TestBlock::new(10, 0)
2607 .set_timestamp_ms(block_timestamp)
2608 .build(),
2609 );
2610
2611 dag_state.accept_block(block);
2613 }
2614
2615 #[tokio::test]
2616 async fn test_last_finalized_commit() {
2617 let (context, _) = Context::new_for_test(4);
2619 let context = Arc::new(context);
2620 let store = Arc::new(MemStore::new());
2621 let mut dag_state = DagState::new(context.clone(), store.clone());
2622
2623 let commit_ref = CommitRef::new(1, CommitDigest::MIN);
2625 let rejected_transactions = BTreeMap::new();
2626 dag_state.add_finalized_commit(commit_ref, rejected_transactions.clone());
2627
2628 assert_eq!(dag_state.finalized_commits_to_write.len(), 1);
2630 assert_eq!(
2631 dag_state.finalized_commits_to_write[0],
2632 (commit_ref, rejected_transactions.clone())
2633 );
2634
2635 dag_state.flush();
2637
2638 let last_finalized_commit = store.read_last_finalized_commit().unwrap();
2640 assert_eq!(last_finalized_commit, Some(commit_ref));
2641 let stored_rejected_transactions = store
2642 .read_rejected_transactions(commit_ref)
2643 .unwrap()
2644 .unwrap();
2645 assert_eq!(stored_rejected_transactions, rejected_transactions);
2646 }
2647}