1use std::{
5 cmp::max,
6 collections::{BTreeMap, BTreeSet, VecDeque},
7 ops::Bound::{Excluded, Included, Unbounded},
8 panic,
9 sync::Arc,
10 time::Duration,
11 vec,
12};
13
14use consensus_config::AuthorityIndex;
15use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs, Round, TransactionIndex};
16use itertools::Itertools as _;
17use mysten_common::ZipDebugEqIteratorExt;
18use tokio::time::Instant;
19use tracing::{debug, error, info, trace};
20
21use crate::{
22 CommittedSubDag,
23 block::{BlockAPI, GENESIS_ROUND, Slot, VerifiedBlock, genesis_blocks},
24 commit::{
25 CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRef, CommitVote,
26 GENESIS_COMMIT_INDEX, TrustedCommit, load_committed_subdag_from_store,
27 },
28 context::Context,
29 leader_scoring::{ReputationScores, ScoringSubdag},
30 storage::{Store, WriteBatch},
31 threshold_clock::ThresholdClock,
32};
33
34pub struct DagState {
42 context: Arc<Context>,
43
44 genesis: BTreeMap<BlockRef, VerifiedBlock>,
46
47 recent_blocks: BTreeMap<BlockRef, BlockInfo>,
55
56 recent_refs_by_authority: Vec<BTreeSet<BlockRef>>,
59
60 threshold_clock: ThresholdClock,
62
63 evicted_rounds: Vec<Round>,
67
68 highest_accepted_round: Round,
70
71 last_commit: Option<TrustedCommit>,
73
74 last_commit_round_advancement_time: Option<std::time::Instant>,
76
77 last_committed_rounds: Vec<Round>,
79
80 scoring_subdag: ScoringSubdag,
83
84 pending_commit_votes: VecDeque<CommitVote>,
88
89 blocks_to_write: Vec<VerifiedBlock>,
92 commits_to_write: Vec<TrustedCommit>,
93
94 commit_info_to_write: Vec<(CommitRef, CommitInfo)>,
98
99 finalized_commits_to_write: Vec<(CommitRef, BTreeMap<BlockRef, Vec<TransactionIndex>>)>,
101
102 store: Arc<dyn Store>,
104
105 cached_rounds: Round,
107}
108
109impl DagState {
110 pub fn new(context: Arc<Context>, store: Arc<dyn Store>) -> Self {
112 let cached_rounds = context.parameters.dag_state_cached_rounds as Round;
113 let num_authorities = context.committee.size();
114
115 let genesis = genesis_blocks(context.as_ref())
116 .into_iter()
117 .map(|block| (block.reference(), block))
118 .collect();
119
120 let threshold_clock = ThresholdClock::new(1, context.clone());
121
122 let last_commit = store
123 .read_last_commit()
124 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
125
126 let commit_info = store
127 .read_last_commit_info()
128 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
129 let (mut last_committed_rounds, commit_recovery_start_index) =
130 if let Some((commit_ref, commit_info)) = commit_info {
131 tracing::info!("Recovering committed state from {commit_ref} {commit_info:?}");
132 (commit_info.committed_rounds, commit_ref.index + 1)
133 } else {
134 tracing::info!("Found no stored CommitInfo to recover from");
135 (vec![0; num_authorities], GENESIS_COMMIT_INDEX + 1)
136 };
137
138 let mut unscored_committed_subdags = Vec::new();
139 let mut scoring_subdag = ScoringSubdag::new(context.clone());
140
141 if let Some(last_commit) = last_commit.as_ref() {
142 store
143 .scan_commits((commit_recovery_start_index..=last_commit.index()).into())
144 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
145 .iter()
146 .for_each(|commit| {
147 for block_ref in commit.blocks() {
148 last_committed_rounds[block_ref.author] =
149 max(last_committed_rounds[block_ref.author], block_ref.round);
150 }
151 let committed_subdag =
152 load_committed_subdag_from_store(store.as_ref(), commit.clone(), vec![]);
153 unscored_committed_subdags.push(committed_subdag);
155 });
156 }
157
158 tracing::info!(
159 "DagState was initialized with the following state: \
160 {last_commit:?}; {last_committed_rounds:?}; {} unscored committed subdags;",
161 unscored_committed_subdags.len()
162 );
163
164 scoring_subdag.add_subdags(std::mem::take(&mut unscored_committed_subdags));
165
166 let mut state = Self {
167 context: context.clone(),
168 genesis,
169 recent_blocks: BTreeMap::new(),
170 recent_refs_by_authority: vec![BTreeSet::new(); num_authorities],
171 threshold_clock,
172 highest_accepted_round: 0,
173 last_commit: last_commit.clone(),
174 last_commit_round_advancement_time: None,
175 last_committed_rounds: last_committed_rounds.clone(),
176 pending_commit_votes: VecDeque::new(),
177 blocks_to_write: vec![],
178 commits_to_write: vec![],
179 commit_info_to_write: vec![],
180 finalized_commits_to_write: vec![],
181 scoring_subdag,
182 store: store.clone(),
183 cached_rounds,
184 evicted_rounds: vec![0; num_authorities],
185 };
186
187 for (authority_index, _) in context.committee.authorities() {
188 let (blocks, eviction_round) = {
189 let last_block = state
192 .store
193 .scan_last_blocks_by_author(authority_index, 1, None)
194 .expect("Database error");
195 let last_block_round = last_block
196 .last()
197 .map(|b| b.round())
198 .unwrap_or(GENESIS_ROUND);
199
200 let eviction_round =
201 Self::eviction_round(last_block_round, state.gc_round(), state.cached_rounds);
202 let blocks = state
203 .store
204 .scan_blocks_by_author(authority_index, eviction_round + 1)
205 .expect("Database error");
206
207 (blocks, eviction_round)
208 };
209
210 state.evicted_rounds[authority_index] = eviction_round;
211
212 for block in &blocks {
214 state.update_block_metadata(block);
215 }
216
217 debug!(
218 "Recovered blocks {}: {:?}",
219 authority_index,
220 blocks
221 .iter()
222 .map(|b| b.reference())
223 .collect::<Vec<BlockRef>>()
224 );
225 }
226
227 if let Some(last_commit) = last_commit {
228 let mut index = last_commit.index();
229 let gc_round = state.gc_round();
230 info!(
231 "Recovering block commit statuses from commit index {} and backwards until leader of round <= gc_round {:?}",
232 index, gc_round
233 );
234
235 loop {
236 let commits = store
237 .scan_commits((index..=index).into())
238 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
239 let Some(commit) = commits.first() else {
240 info!("Recovering finished up to index {index}, no more commits to recover");
241 break;
242 };
243
244 if gc_round > 0 && commit.leader().round <= gc_round {
246 info!(
247 "Recovering finished, reached commit leader round {} <= gc_round {}",
248 commit.leader().round,
249 gc_round
250 );
251 break;
252 }
253
254 commit.blocks().iter().filter(|b| b.round > gc_round).for_each(|block_ref|{
255 debug!(
256 "Setting block {:?} as committed based on commit {:?}",
257 block_ref,
258 commit.index()
259 );
260 assert!(state.set_committed(block_ref), "Attempted to set again a block {:?} as committed when recovering commit {:?}", block_ref, commit);
261 });
262
263 index = index.saturating_sub(1);
265 if index == 0 {
266 break;
267 }
268 }
269 }
270
271 let proposed_blocks = store
273 .scan_blocks_by_author(context.own_index, state.gc_round() + 1)
274 .expect("Database error");
275 for block in proposed_blocks {
276 state.link_causal_history(block.reference());
277 }
278
279 state
280 }
281
282 pub(crate) fn accept_block(&mut self, block: VerifiedBlock) {
284 assert_ne!(
285 block.round(),
286 0,
287 "Genesis block should not be accepted into DAG."
288 );
289
290 let block_ref = block.reference();
291 if self.contains_block(&block_ref) {
292 return;
293 }
294
295 let now = self.context.clock.timestamp_utc_ms();
296 if block.timestamp_ms() > now {
297 trace!(
298 "Block {:?} with timestamp {} is greater than local timestamp {}.",
299 block,
300 block.timestamp_ms(),
301 now,
302 );
303 }
304 let hostname = &self.context.committee.authority(block_ref.author).hostname;
305 self.context
306 .metrics
307 .node_metrics
308 .accepted_block_time_drift_ms
309 .with_label_values(&[hostname])
310 .inc_by(block.timestamp_ms().saturating_sub(now));
311
312 if block_ref.author == self.context.own_index {
315 let existing_blocks = self.get_uncommitted_blocks_at_slot(block_ref.into());
316 if !self
317 .context
318 .parameters
319 .internal
320 .skip_equivocation_validation
321 {
322 assert!(
323 existing_blocks.is_empty(),
324 "Block Rejected! Attempted to add block {block:#?} to own slot where \
325 block(s) {existing_blocks:#?} already exists."
326 );
327 }
328 }
329 self.update_block_metadata(&block);
330 self.blocks_to_write.push(block);
331 let source = if self.context.own_index == block_ref.author {
332 "own"
333 } else {
334 "others"
335 };
336 self.context
337 .metrics
338 .node_metrics
339 .accepted_blocks
340 .with_label_values(&[source])
341 .inc();
342 }
343
344 fn update_block_metadata(&mut self, block: &VerifiedBlock) {
346 let block_ref = block.reference();
347 self.recent_blocks
348 .insert(block_ref, BlockInfo::new(block.clone()));
349 self.recent_refs_by_authority[block_ref.author].insert(block_ref);
350
351 if self.threshold_clock.add_block(block_ref) {
352 let last_proposed_block = self.get_last_proposed_block();
354 if last_proposed_block.round() == block_ref.round {
355 let quorum_delay_ms = self
356 .context
357 .clock
358 .timestamp_utc_ms()
359 .saturating_sub(self.get_last_proposed_block().timestamp_ms());
360 self.context
361 .metrics
362 .node_metrics
363 .quorum_receive_latency
364 .observe(Duration::from_millis(quorum_delay_ms).as_secs_f64());
365 }
366 }
367
368 self.highest_accepted_round = max(self.highest_accepted_round, block.round());
369 self.context
370 .metrics
371 .node_metrics
372 .highest_accepted_round
373 .set(self.highest_accepted_round as i64);
374
375 let highest_accepted_round_for_author = self.recent_refs_by_authority[block_ref.author]
376 .last()
377 .map(|block_ref| block_ref.round)
378 .expect("There should be by now at least one block ref");
379 let hostname = &self.context.committee.authority(block_ref.author).hostname;
380 self.context
381 .metrics
382 .node_metrics
383 .highest_accepted_authority_round
384 .with_label_values(&[hostname])
385 .set(highest_accepted_round_for_author as i64);
386 }
387
388 pub(crate) fn accept_blocks(&mut self, blocks: Vec<VerifiedBlock>) {
390 debug!(
391 "Accepting blocks: {}",
392 blocks.iter().map(|b| b.reference().to_string()).join(",")
393 );
394 for block in blocks {
395 self.accept_block(block);
396 }
397 }
398
399 pub(crate) fn get_block(&self, reference: &BlockRef) -> Option<VerifiedBlock> {
402 self.get_blocks(&[*reference])
403 .pop()
404 .expect("Exactly one element should be returned")
405 }
406
407 pub(crate) fn get_blocks(&self, block_refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
410 if block_refs.is_empty() {
411 return vec![];
412 }
413
414 let mut blocks = vec![None; block_refs.len()];
415 let mut missing = Vec::new();
416
417 for (index, block_ref) in block_refs.iter().enumerate() {
418 if block_ref.round == GENESIS_ROUND {
419 if let Some(block) = self.genesis.get(block_ref) {
421 blocks[index] = Some(block.clone());
422 }
423 continue;
424 }
425 if let Some(block_info) = self.recent_blocks.get(block_ref) {
426 blocks[index] = Some(block_info.block.clone());
427 continue;
428 }
429 missing.push((index, block_ref));
430 }
431
432 if missing.is_empty() {
433 return blocks;
434 }
435
436 let missing_refs = missing
437 .iter()
438 .map(|(_, block_ref)| **block_ref)
439 .collect::<Vec<_>>();
440 let store_results = self
441 .store
442 .read_blocks(&missing_refs)
443 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
444 self.context
445 .metrics
446 .node_metrics
447 .dag_state_store_read_count
448 .with_label_values(&["get_blocks"])
449 .inc();
450
451 for ((index, _), result) in missing.into_iter().zip_debug_eq(store_results.into_iter()) {
452 blocks[index] = result;
453 }
454
455 blocks
456 }
457
458 pub(crate) fn get_uncommitted_blocks_at_slot(&self, slot: Slot) -> Vec<VerifiedBlock> {
461 let mut blocks = vec![];
465 for (_block_ref, block_info) in self.recent_blocks.range((
466 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
467 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
468 )) {
469 blocks.push(block_info.block.clone())
470 }
471 blocks
472 }
473
474 pub(crate) fn get_uncommitted_blocks_at_round(&self, round: Round) -> Vec<VerifiedBlock> {
477 if round <= self.last_commit_round() {
478 panic!("Round {} have committed blocks!", round);
479 }
480
481 let mut blocks = vec![];
482 for (_block_ref, block_info) in self.recent_blocks.range((
483 Included(BlockRef::new(round, AuthorityIndex::ZERO, BlockDigest::MIN)),
484 Excluded(BlockRef::new(
485 round + 1,
486 AuthorityIndex::ZERO,
487 BlockDigest::MIN,
488 )),
489 )) {
490 blocks.push(block_info.block.clone())
491 }
492 blocks
493 }
494
495 pub(crate) fn ancestors_at_round(
497 &self,
498 later_block: &VerifiedBlock,
499 earlier_round: Round,
500 ) -> Vec<VerifiedBlock> {
501 let mut linked: BTreeSet<BlockRef> = later_block.ancestors().iter().cloned().collect();
503 while !linked.is_empty() {
504 let round = linked.last().unwrap().round;
505 if round <= earlier_round {
507 break;
508 }
509 let block_ref = linked.pop_last().unwrap();
510 let Some(block) = self.get_block(&block_ref) else {
511 panic!("Block {:?} should exist in DAG!", block_ref);
512 };
513 linked.extend(block.ancestors().iter().cloned());
514 }
515 linked
516 .range((
517 Included(BlockRef::new(
518 earlier_round,
519 AuthorityIndex::ZERO,
520 BlockDigest::MIN,
521 )),
522 Unbounded,
523 ))
524 .map(|r| {
525 self.get_block(r)
526 .unwrap_or_else(|| panic!("Block {:?} should exist in DAG!", r))
527 .clone()
528 })
529 .collect()
530 }
531
532 pub(crate) fn get_last_proposed_block(&self) -> VerifiedBlock {
535 self.get_last_block_for_authority(self.context.own_index)
536 }
537
538 pub(crate) fn get_last_block_for_authority(&self, authority: AuthorityIndex) -> VerifiedBlock {
541 if let Some(last) = self.recent_refs_by_authority[authority].last() {
542 return self
543 .recent_blocks
544 .get(last)
545 .expect("Block should be found in recent blocks")
546 .block
547 .clone();
548 }
549
550 let (_, genesis_block) = self
552 .genesis
553 .iter()
554 .find(|(block_ref, _)| block_ref.author == authority)
555 .expect("Genesis should be found for authority {authority_index}");
556 genesis_block.clone()
557 }
558
559 pub(crate) fn get_cached_blocks(
565 &self,
566 authority: AuthorityIndex,
567 start: Round,
568 ) -> Vec<VerifiedBlock> {
569 self.get_cached_blocks_in_range(authority, start, Round::MAX, usize::MAX)
570 }
571
572 pub(crate) fn get_cached_blocks_in_range(
575 &self,
576 authority: AuthorityIndex,
577 start_round: Round,
578 end_round: Round,
579 limit: usize,
580 ) -> Vec<VerifiedBlock> {
581 if start_round >= end_round || limit == 0 {
582 return vec![];
583 }
584
585 let mut blocks = vec![];
586 for block_ref in self.recent_refs_by_authority[authority].range((
587 Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
588 Excluded(BlockRef::new(
589 end_round,
590 AuthorityIndex::MIN,
591 BlockDigest::MIN,
592 )),
593 )) {
594 let block_info = self
595 .recent_blocks
596 .get(block_ref)
597 .expect("Block should exist in recent blocks");
598 blocks.push(block_info.block.clone());
599 if blocks.len() >= limit {
600 break;
601 }
602 }
603 blocks
604 }
605
606 pub(crate) fn get_last_cached_block_in_range(
608 &self,
609 authority: AuthorityIndex,
610 start_round: Round,
611 end_round: Round,
612 ) -> Option<VerifiedBlock> {
613 if start_round >= end_round {
614 return None;
615 }
616
617 let block_ref = self.recent_refs_by_authority[authority]
618 .range((
619 Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
620 Excluded(BlockRef::new(
621 end_round,
622 AuthorityIndex::MIN,
623 BlockDigest::MIN,
624 )),
625 ))
626 .last()?;
627
628 self.recent_blocks
629 .get(block_ref)
630 .map(|block_info| block_info.block.clone())
631 }
632
633 pub(crate) fn get_last_cached_block_per_authority(
640 &self,
641 end_round: Round,
642 ) -> Vec<(VerifiedBlock, Vec<BlockRef>)> {
643 let mut blocks = self.genesis.values().cloned().collect::<Vec<_>>();
645 let mut equivocating_blocks = vec![vec![]; self.context.committee.size()];
646
647 if end_round == GENESIS_ROUND {
648 panic!(
649 "Attempted to retrieve blocks earlier than the genesis round which is not possible"
650 );
651 }
652
653 if end_round == GENESIS_ROUND + 1 {
654 return blocks.into_iter().map(|b| (b, vec![])).collect();
655 }
656
657 for (authority_index, block_refs) in self.recent_refs_by_authority.iter().enumerate() {
658 let authority_index = self
659 .context
660 .committee
661 .to_authority_index(authority_index)
662 .unwrap();
663
664 let last_evicted_round = self.evicted_rounds[authority_index];
665 if end_round.saturating_sub(1) <= last_evicted_round {
666 panic!(
667 "Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}",
668 );
669 }
670
671 let block_ref_iter = block_refs
672 .range((
673 Included(BlockRef::new(
674 last_evicted_round + 1,
675 authority_index,
676 BlockDigest::MIN,
677 )),
678 Excluded(BlockRef::new(end_round, authority_index, BlockDigest::MIN)),
679 ))
680 .rev();
681
682 let mut last_round = 0;
683 for block_ref in block_ref_iter {
684 if last_round == 0 {
685 last_round = block_ref.round;
686 let block_info = self
687 .recent_blocks
688 .get(block_ref)
689 .expect("Block should exist in recent blocks");
690 blocks[authority_index] = block_info.block.clone();
691 continue;
692 }
693 if block_ref.round < last_round {
694 break;
695 }
696 equivocating_blocks[authority_index].push(*block_ref);
697 }
698 }
699
700 blocks
701 .into_iter()
702 .zip_debug_eq(equivocating_blocks)
703 .collect()
704 }
705
706 pub(crate) fn contains_cached_block_at_slot(&self, slot: Slot) -> bool {
709 if slot.round == GENESIS_ROUND {
711 return true;
712 }
713
714 let eviction_round = self.evicted_rounds[slot.authority];
715 if slot.round <= eviction_round {
716 panic!(
717 "{}",
718 format!(
719 "Attempted to check for slot {slot} that is <= the last evicted round {eviction_round}"
720 )
721 );
722 }
723
724 let mut result = self.recent_refs_by_authority[slot.authority].range((
725 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
726 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
727 ));
728 result.next().is_some()
729 }
730
731 pub(crate) fn contains_blocks(&self, block_refs: Vec<BlockRef>) -> Vec<bool> {
734 let mut exist = vec![false; block_refs.len()];
735 let mut missing = Vec::new();
736
737 for (index, block_ref) in block_refs.into_iter().enumerate() {
738 let recent_refs = &self.recent_refs_by_authority[block_ref.author];
739 if recent_refs.contains(&block_ref) || self.genesis.contains_key(&block_ref) {
740 exist[index] = true;
741 } else if recent_refs.is_empty() || recent_refs.last().unwrap().round < block_ref.round
742 {
743 exist[index] = false;
747 } else {
748 missing.push((index, block_ref));
749 }
750 }
751
752 if missing.is_empty() {
753 return exist;
754 }
755
756 let missing_refs = missing
757 .iter()
758 .map(|(_, block_ref)| *block_ref)
759 .collect::<Vec<_>>();
760 let store_results = self
761 .store
762 .contains_blocks(&missing_refs)
763 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
764 self.context
765 .metrics
766 .node_metrics
767 .dag_state_store_read_count
768 .with_label_values(&["contains_blocks"])
769 .inc();
770
771 for ((index, _), result) in missing.into_iter().zip_debug_eq(store_results.into_iter()) {
772 exist[index] = result;
773 }
774
775 exist
776 }
777
778 pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool {
779 let blocks = self.contains_blocks(vec![*block_ref]);
780 blocks.first().cloned().unwrap()
781 }
782
783 pub(crate) fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
786 if let Some(block_info) = self.recent_blocks.get_mut(block_ref) {
787 if !block_info.committed {
788 block_info.committed = true;
789 return true;
790 }
791 false
792 } else {
793 panic!(
794 "Block {:?} not found in cache to set as committed.",
795 block_ref
796 );
797 }
798 }
799
800 pub(crate) fn is_committed(&self, block_ref: &BlockRef) -> bool {
802 self.recent_blocks
803 .get(block_ref)
804 .unwrap_or_else(|| panic!("Attempted to query for commit status for a block not in cached data {block_ref}"))
805 .committed
806 }
807
808 pub(crate) fn link_causal_history(&mut self, root_block: BlockRef) -> Vec<BlockRef> {
814 let gc_round = self.gc_round();
815 let mut linked_blocks = vec![];
816 let mut targets = VecDeque::new();
817 targets.push_back(root_block);
818 while let Some(block_ref) = targets.pop_front() {
819 if block_ref.round <= gc_round {
827 continue;
828 }
829 let block_info = self
830 .recent_blocks
831 .get_mut(&block_ref)
832 .unwrap_or_else(|| panic!("Block {:?} is not in DAG state", block_ref));
833 if block_info.included {
834 continue;
835 }
836 linked_blocks.push(block_ref);
837 block_info.included = true;
838 targets.extend(block_info.block.ancestors().iter());
839 }
840 linked_blocks
841 }
842
843 pub(crate) fn has_been_included(&self, block_ref: &BlockRef) -> bool {
846 self.recent_blocks
847 .get(block_ref)
848 .unwrap_or_else(|| {
849 panic!(
850 "Attempted to query for inclusion status for a block not in cached data {}",
851 block_ref
852 )
853 })
854 .included
855 }
856
857 pub(crate) fn threshold_clock_round(&self) -> Round {
858 self.threshold_clock.get_round()
859 }
860
861 pub(crate) fn threshold_clock_quorum_ts(&self) -> Instant {
863 self.threshold_clock.get_quorum_ts()
864 }
865
866 pub(crate) fn highest_accepted_round(&self) -> Round {
867 self.highest_accepted_round
868 }
869
870 pub(crate) fn add_commit(&mut self, commit: TrustedCommit) {
873 let time_diff = if let Some(last_commit) = &self.last_commit {
874 if commit.index() <= last_commit.index() {
875 error!(
876 "New commit index {} <= last commit index {}!",
877 commit.index(),
878 last_commit.index()
879 );
880 return;
881 }
882 assert_eq!(commit.index(), last_commit.index() + 1);
883
884 if commit.timestamp_ms() < last_commit.timestamp_ms() {
885 panic!(
886 "Commit timestamps do not monotonically increment, prev commit {:?}, new commit {:?}",
887 last_commit, commit
888 );
889 }
890 commit
891 .timestamp_ms()
892 .saturating_sub(last_commit.timestamp_ms())
893 } else {
894 assert_eq!(commit.index(), 1);
895 0
896 };
897
898 self.context
899 .metrics
900 .node_metrics
901 .last_commit_time_diff
902 .observe(time_diff as f64);
903
904 let commit_round_advanced = if let Some(previous_commit) = &self.last_commit {
905 previous_commit.round() < commit.round()
906 } else {
907 true
908 };
909
910 self.last_commit = Some(commit.clone());
911
912 if commit_round_advanced {
913 let now = std::time::Instant::now();
914 if let Some(previous_time) = self.last_commit_round_advancement_time {
915 self.context
916 .metrics
917 .node_metrics
918 .commit_round_advancement_interval
919 .observe(now.duration_since(previous_time).as_secs_f64())
920 }
921 self.last_commit_round_advancement_time = Some(now);
922 }
923
924 for block_ref in commit.blocks().iter() {
925 self.last_committed_rounds[block_ref.author] = max(
926 self.last_committed_rounds[block_ref.author],
927 block_ref.round,
928 );
929 }
930
931 for (i, round) in self.last_committed_rounds.iter().enumerate() {
932 let index = self.context.committee.to_authority_index(i).unwrap();
933 let hostname = &self.context.committee.authority(index).hostname;
934 self.context
935 .metrics
936 .node_metrics
937 .last_committed_authority_round
938 .with_label_values(&[hostname])
939 .set((*round).into());
940 }
941
942 self.pending_commit_votes.push_back(commit.reference());
943 self.commits_to_write.push(commit);
944 }
945
946 pub(crate) fn recover_commits_to_write(&mut self, commits: Vec<TrustedCommit>) {
948 self.commits_to_write.extend(commits);
949 }
950
951 pub(crate) fn ensure_commits_to_write_is_empty(&self) {
952 assert!(
953 self.commits_to_write.is_empty(),
954 "Commits to write should be empty. {:?}",
955 self.commits_to_write,
956 );
957 }
958
959 pub(crate) fn add_commit_info(&mut self, reputation_scores: ReputationScores) {
960 assert!(self.scoring_subdag.is_empty());
964
965 let commit_info = CommitInfo {
966 committed_rounds: self.last_committed_rounds.clone(),
967 reputation_scores,
968 };
969 let last_commit = self
970 .last_commit
971 .as_ref()
972 .expect("Last commit should already be set.");
973 self.commit_info_to_write
974 .push((last_commit.reference(), commit_info));
975 }
976
977 pub(crate) fn add_finalized_commit(
978 &mut self,
979 commit_ref: CommitRef,
980 rejected_transactions: BTreeMap<BlockRef, Vec<TransactionIndex>>,
981 ) {
982 self.finalized_commits_to_write
983 .push((commit_ref, rejected_transactions));
984 }
985
986 pub(crate) fn take_commit_votes(&mut self, limit: usize) -> Vec<CommitVote> {
987 let mut votes = Vec::new();
988 while !self.pending_commit_votes.is_empty() && votes.len() < limit {
989 votes.push(self.pending_commit_votes.pop_front().unwrap());
990 }
991 votes
992 }
993
994 pub(crate) fn last_commit_index(&self) -> CommitIndex {
996 match &self.last_commit {
997 Some(commit) => commit.index(),
998 None => 0,
999 }
1000 }
1001
1002 pub(crate) fn last_commit_digest(&self) -> CommitDigest {
1004 match &self.last_commit {
1005 Some(commit) => commit.digest(),
1006 None => CommitDigest::MIN,
1007 }
1008 }
1009
1010 pub(crate) fn last_commit_timestamp_ms(&self) -> BlockTimestampMs {
1012 match &self.last_commit {
1013 Some(commit) => commit.timestamp_ms(),
1014 None => 0,
1015 }
1016 }
1017
1018 pub(crate) fn last_commit_leader(&self) -> Slot {
1020 match &self.last_commit {
1021 Some(commit) => commit.leader().into(),
1022 None => self
1023 .genesis
1024 .iter()
1025 .next()
1026 .map(|(genesis_ref, _)| *genesis_ref)
1027 .expect("Genesis blocks should always be available.")
1028 .into(),
1029 }
1030 }
1031
1032 pub(crate) fn last_commit_round(&self) -> Round {
1034 match &self.last_commit {
1035 Some(commit) => commit.leader().round,
1036 None => 0,
1037 }
1038 }
1039
1040 pub(crate) fn last_committed_rounds(&self) -> Vec<Round> {
1042 self.last_committed_rounds.clone()
1043 }
1044
1045 pub(crate) fn gc_round(&self) -> Round {
1049 self.calculate_gc_round(self.last_commit_round())
1050 }
1051
1052 pub(crate) fn calculate_gc_round(&self, commit_round: Round) -> Round {
1055 commit_round.saturating_sub(self.context.protocol_config.gc_depth())
1056 }
1057
1058 pub(crate) fn flush(&mut self) {
1068 let _s = self
1069 .context
1070 .metrics
1071 .node_metrics
1072 .scope_processing_time
1073 .with_label_values(&["DagState::flush"])
1074 .start_timer();
1075
1076 let pending_blocks = std::mem::take(&mut self.blocks_to_write);
1078 let pending_commits = std::mem::take(&mut self.commits_to_write);
1079 let pending_commit_info = std::mem::take(&mut self.commit_info_to_write);
1080 let pending_finalized_commits = std::mem::take(&mut self.finalized_commits_to_write);
1081 if pending_blocks.is_empty()
1082 && pending_commits.is_empty()
1083 && pending_commit_info.is_empty()
1084 && pending_finalized_commits.is_empty()
1085 {
1086 return;
1087 }
1088
1089 debug!(
1090 "Flushing {} blocks ({}), {} commits ({}), {} commit infos ({}), {} finalized commits ({}) to storage.",
1091 pending_blocks.len(),
1092 pending_blocks
1093 .iter()
1094 .map(|b| b.reference().to_string())
1095 .join(","),
1096 pending_commits.len(),
1097 pending_commits
1098 .iter()
1099 .map(|c| c.reference().to_string())
1100 .join(","),
1101 pending_commit_info.len(),
1102 pending_commit_info
1103 .iter()
1104 .map(|(commit_ref, _)| commit_ref.to_string())
1105 .join(","),
1106 pending_finalized_commits.len(),
1107 pending_finalized_commits
1108 .iter()
1109 .map(|(commit_ref, _)| commit_ref.to_string())
1110 .join(","),
1111 );
1112 self.store
1113 .write(WriteBatch::new(
1114 pending_blocks,
1115 pending_commits,
1116 pending_commit_info,
1117 pending_finalized_commits,
1118 ))
1119 .unwrap_or_else(|e| panic!("Failed to write to storage: {:?}", e));
1120 self.context
1121 .metrics
1122 .node_metrics
1123 .dag_state_store_write_count
1124 .inc();
1125
1126 for (authority_index, _) in self.context.committee.authorities() {
1128 let eviction_round = self.calculate_authority_eviction_round(authority_index);
1129 while let Some(block_ref) = self.recent_refs_by_authority[authority_index].first() {
1130 if block_ref.round <= eviction_round {
1131 self.recent_blocks.remove(block_ref);
1132 self.recent_refs_by_authority[authority_index].pop_first();
1133 } else {
1134 break;
1135 }
1136 }
1137 self.evicted_rounds[authority_index] = eviction_round;
1138 }
1139
1140 let metrics = &self.context.metrics.node_metrics;
1141 metrics
1142 .dag_state_recent_blocks
1143 .set(self.recent_blocks.len() as i64);
1144 metrics.dag_state_recent_refs.set(
1145 self.recent_refs_by_authority
1146 .iter()
1147 .map(BTreeSet::len)
1148 .sum::<usize>() as i64,
1149 );
1150 }
1151
1152 pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> {
1153 self.store
1154 .read_last_commit_info()
1155 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
1156 }
1157
1158 pub(crate) fn add_scoring_subdags(&mut self, scoring_subdags: Vec<CommittedSubDag>) {
1159 self.scoring_subdag.add_subdags(scoring_subdags);
1160 }
1161
1162 pub(crate) fn clear_scoring_subdag(&mut self) {
1163 self.scoring_subdag.clear();
1164 }
1165
1166 pub(crate) fn scoring_subdags_count(&self) -> usize {
1167 self.scoring_subdag.scored_subdags_count()
1168 }
1169
1170 pub(crate) fn is_scoring_subdag_empty(&self) -> bool {
1171 self.scoring_subdag.is_empty()
1172 }
1173
1174 pub(crate) fn calculate_scoring_subdag_scores(&self) -> ReputationScores {
1175 self.scoring_subdag.calculate_distributed_vote_scores()
1176 }
1177
1178 pub(crate) fn scoring_subdag_commit_range(&self) -> CommitIndex {
1179 self.scoring_subdag
1180 .commit_range
1181 .as_ref()
1182 .expect("commit range should exist for scoring subdag")
1183 .end()
1184 }
1185
1186 fn calculate_authority_eviction_round(&self, authority_index: AuthorityIndex) -> Round {
1190 let last_round = self.recent_refs_by_authority[authority_index]
1191 .last()
1192 .map(|block_ref| block_ref.round)
1193 .unwrap_or(GENESIS_ROUND);
1194
1195 Self::eviction_round(last_round, self.gc_round(), self.cached_rounds)
1196 }
1197
1198 fn eviction_round(last_round: Round, gc_round: Round, cached_rounds: u32) -> Round {
1201 gc_round.min(last_round.saturating_sub(cached_rounds))
1202 }
1203
1204 pub(crate) fn store(&self) -> Arc<dyn Store> {
1206 self.store.clone()
1207 }
1208
1209 #[cfg(test)]
1212 pub(crate) fn last_quorum(&self) -> Vec<VerifiedBlock> {
1213 for round in
1216 (self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev()
1217 {
1218 if round == GENESIS_ROUND {
1219 return self.genesis_blocks();
1220 }
1221 use crate::stake_aggregator::{QuorumThreshold, StakeAggregator};
1222 let mut quorum = StakeAggregator::<QuorumThreshold>::new();
1223
1224 let blocks = self.get_uncommitted_blocks_at_round(round);
1226 for block in &blocks {
1227 if quorum.add(block.author(), &self.context.committee) {
1228 return blocks;
1229 }
1230 }
1231 }
1232
1233 panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds.");
1234 }
1235
1236 #[cfg(test)]
1237 pub(crate) fn genesis_blocks(&self) -> Vec<VerifiedBlock> {
1238 self.genesis.values().cloned().collect()
1239 }
1240
1241 #[cfg(test)]
1242 pub(crate) fn set_last_commit(&mut self, commit: TrustedCommit) {
1243 self.last_commit = Some(commit);
1244 }
1245}
1246
1247struct BlockInfo {
1248 block: VerifiedBlock,
1249 committed: bool,
1251 included: bool,
1258}
1259
1260impl BlockInfo {
1261 fn new(block: VerifiedBlock) -> Self {
1262 Self {
1263 block,
1264 committed: false,
1265 included: false,
1266 }
1267 }
1268}
1269
1270#[cfg(test)]
1271mod test {
1272 use std::vec;
1273
1274 use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs};
1275 use parking_lot::RwLock;
1276
1277 use super::*;
1278 use crate::{
1279 block::{TestBlock, VerifiedBlock},
1280 storage::{WriteBatch, mem_store::MemStore},
1281 test_dag_builder::DagBuilder,
1282 test_dag_parser::parse_dag,
1283 };
1284
1285 #[tokio::test]
1286 async fn test_get_blocks() {
1287 let (context, _) = Context::new_for_test(4);
1288 let context = Arc::new(context);
1289 let store = Arc::new(MemStore::new());
1290 let mut dag_state = DagState::new(context.clone(), store.clone());
1291 let own_index = AuthorityIndex::new_for_test(0);
1292
1293 let num_rounds: u32 = 10;
1295 let non_existent_round: u32 = 100;
1296 let num_authorities: u32 = 3;
1297 let num_blocks_per_slot: usize = 3;
1298 let mut blocks = BTreeMap::new();
1299 for round in 1..=num_rounds {
1300 for author in 0..num_authorities {
1301 let base_ts = round as BlockTimestampMs * 1000;
1303 for timestamp in base_ts..base_ts + num_blocks_per_slot as u64 {
1304 let block = VerifiedBlock::new_for_test(
1305 TestBlock::new(round, author)
1306 .set_timestamp_ms(timestamp)
1307 .build(),
1308 );
1309 dag_state.accept_block(block.clone());
1310 blocks.insert(block.reference(), block);
1311
1312 if AuthorityIndex::new_for_test(author) == own_index {
1314 break;
1315 }
1316 }
1317 }
1318 }
1319
1320 for (r, block) in &blocks {
1322 assert_eq!(&dag_state.get_block(r).unwrap(), block);
1323 }
1324
1325 let last_ref = blocks.keys().last().unwrap();
1327 assert!(
1328 dag_state
1329 .get_block(&BlockRef::new(
1330 last_ref.round,
1331 last_ref.author,
1332 BlockDigest::MIN
1333 ))
1334 .is_none()
1335 );
1336
1337 for round in 1..=num_rounds {
1339 for author in 0..num_authorities {
1340 let slot = Slot::new(
1341 round,
1342 context
1343 .committee
1344 .to_authority_index(author as usize)
1345 .unwrap(),
1346 );
1347 let blocks = dag_state.get_uncommitted_blocks_at_slot(slot);
1348
1349 if AuthorityIndex::new_for_test(author) == own_index {
1351 assert_eq!(blocks.len(), 1);
1352 } else {
1353 assert_eq!(blocks.len(), num_blocks_per_slot);
1354 }
1355
1356 for b in blocks {
1357 assert_eq!(b.round(), round);
1358 assert_eq!(
1359 b.author(),
1360 context
1361 .committee
1362 .to_authority_index(author as usize)
1363 .unwrap()
1364 );
1365 }
1366 }
1367 }
1368
1369 let slot = Slot::new(non_existent_round, AuthorityIndex::ZERO);
1371 assert!(dag_state.get_uncommitted_blocks_at_slot(slot).is_empty());
1372
1373 for round in 1..=num_rounds {
1375 let blocks = dag_state.get_uncommitted_blocks_at_round(round);
1376 assert_eq!(
1379 blocks.len(),
1380 (num_authorities - 1) as usize * num_blocks_per_slot + 1
1381 );
1382 for b in blocks {
1383 assert_eq!(b.round(), round);
1384 }
1385 }
1386
1387 assert!(
1389 dag_state
1390 .get_uncommitted_blocks_at_round(non_existent_round)
1391 .is_empty()
1392 );
1393 }
1394
1395 #[tokio::test]
1396 async fn test_ancestors_at_uncommitted_round() {
1397 let (context, _) = Context::new_for_test(4);
1399 let context = Arc::new(context);
1400 let store = Arc::new(MemStore::new());
1401 let mut dag_state = DagState::new(context.clone(), store.clone());
1402
1403 let round_10_refs: Vec<_> = (0..4)
1407 .map(|a| {
1408 VerifiedBlock::new_for_test(TestBlock::new(10, a).set_timestamp_ms(1000).build())
1409 .reference()
1410 })
1411 .collect();
1412
1413 let round_11 = [
1415 VerifiedBlock::new_for_test(
1417 TestBlock::new(11, 0)
1418 .set_timestamp_ms(1100)
1419 .set_ancestors(round_10_refs.clone())
1420 .build(),
1421 ),
1422 VerifiedBlock::new_for_test(
1425 TestBlock::new(11, 1)
1426 .set_timestamp_ms(1110)
1427 .set_ancestors(round_10_refs.clone())
1428 .build(),
1429 ),
1430 VerifiedBlock::new_for_test(
1432 TestBlock::new(11, 1)
1433 .set_timestamp_ms(1111)
1434 .set_ancestors(round_10_refs.clone())
1435 .build(),
1436 ),
1437 VerifiedBlock::new_for_test(
1439 TestBlock::new(11, 1)
1440 .set_timestamp_ms(1112)
1441 .set_ancestors(round_10_refs.clone())
1442 .build(),
1443 ),
1444 VerifiedBlock::new_for_test(
1446 TestBlock::new(11, 2)
1447 .set_timestamp_ms(1120)
1448 .set_ancestors(round_10_refs.clone())
1449 .build(),
1450 ),
1451 VerifiedBlock::new_for_test(
1453 TestBlock::new(11, 3)
1454 .set_timestamp_ms(1130)
1455 .set_ancestors(round_10_refs.clone())
1456 .build(),
1457 ),
1458 ];
1459
1460 let ancestors_for_round_12 = vec![
1462 round_11[0].reference(),
1463 round_11[1].reference(),
1464 round_11[5].reference(),
1465 ];
1466 let round_12 = [
1467 VerifiedBlock::new_for_test(
1468 TestBlock::new(12, 0)
1469 .set_timestamp_ms(1200)
1470 .set_ancestors(ancestors_for_round_12.clone())
1471 .build(),
1472 ),
1473 VerifiedBlock::new_for_test(
1474 TestBlock::new(12, 2)
1475 .set_timestamp_ms(1220)
1476 .set_ancestors(ancestors_for_round_12.clone())
1477 .build(),
1478 ),
1479 VerifiedBlock::new_for_test(
1480 TestBlock::new(12, 3)
1481 .set_timestamp_ms(1230)
1482 .set_ancestors(ancestors_for_round_12.clone())
1483 .build(),
1484 ),
1485 ];
1486
1487 let ancestors_for_round_13 = vec![
1489 round_12[0].reference(),
1490 round_12[1].reference(),
1491 round_12[2].reference(),
1492 round_11[2].reference(),
1493 ];
1494 let round_13 = [
1495 VerifiedBlock::new_for_test(
1496 TestBlock::new(12, 1)
1497 .set_timestamp_ms(1300)
1498 .set_ancestors(ancestors_for_round_13.clone())
1499 .build(),
1500 ),
1501 VerifiedBlock::new_for_test(
1502 TestBlock::new(12, 2)
1503 .set_timestamp_ms(1320)
1504 .set_ancestors(ancestors_for_round_13.clone())
1505 .build(),
1506 ),
1507 VerifiedBlock::new_for_test(
1508 TestBlock::new(12, 3)
1509 .set_timestamp_ms(1330)
1510 .set_ancestors(ancestors_for_round_13.clone())
1511 .build(),
1512 ),
1513 ];
1514
1515 let ancestors_for_round_14 = round_13.iter().map(|b| b.reference()).collect();
1517 let anchor = VerifiedBlock::new_for_test(
1518 TestBlock::new(14, 1)
1519 .set_timestamp_ms(1410)
1520 .set_ancestors(ancestors_for_round_14)
1521 .build(),
1522 );
1523
1524 for b in round_11
1526 .iter()
1527 .chain(round_12.iter())
1528 .chain(round_13.iter())
1529 .chain([anchor.clone()].iter())
1530 {
1531 dag_state.accept_block(b.clone());
1532 }
1533
1534 let ancestors = dag_state.ancestors_at_round(&anchor, 11);
1536 let mut ancestors_refs: Vec<BlockRef> = ancestors.iter().map(|b| b.reference()).collect();
1537 ancestors_refs.sort();
1538 let mut expected_refs = vec![
1539 round_11[0].reference(),
1540 round_11[1].reference(),
1541 round_11[2].reference(),
1542 round_11[5].reference(),
1543 ];
1544 expected_refs.sort(); assert_eq!(
1546 ancestors_refs, expected_refs,
1547 "Expected round 11 ancestors: {:?}. Got: {:?}",
1548 expected_refs, ancestors_refs
1549 );
1550 }
1551
1552 #[tokio::test]
1553 async fn test_link_causal_history() {
1554 let (mut context, _) = Context::new_for_test(4);
1555 context.parameters.dag_state_cached_rounds = 10;
1556 context.protocol_config.set_gc_depth_for_testing(3);
1557 let context = Arc::new(context);
1558
1559 let store = Arc::new(MemStore::new());
1560 let mut dag_state = DagState::new(context.clone(), store.clone());
1561
1562 let mut dag_builder = DagBuilder::new(context.clone());
1564 dag_builder.layers(1..=3).build();
1565 dag_builder
1566 .layers(4..=6)
1567 .authorities(vec![AuthorityIndex::new_for_test(0)])
1568 .skip_block()
1569 .build();
1570
1571 let all_blocks = dag_builder.all_blocks();
1573 dag_state.accept_blocks(all_blocks.clone());
1574
1575 for block in &all_blocks {
1577 assert!(!dag_state.has_been_included(&block.reference()));
1578 }
1579
1580 let round_1_block = &all_blocks[1];
1582 assert_eq!(round_1_block.round(), 1);
1583 let linked_blocks = dag_state.link_causal_history(round_1_block.reference());
1584
1585 assert_eq!(linked_blocks.len(), 1);
1587 assert_eq!(linked_blocks[0], round_1_block.reference());
1588 for block_ref in linked_blocks {
1589 assert!(dag_state.has_been_included(&block_ref));
1590 }
1591
1592 let round_2_block = &all_blocks[4];
1594 assert_eq!(round_2_block.round(), 2);
1595 let linked_blocks = dag_state.link_causal_history(round_2_block.reference());
1596
1597 assert_eq!(linked_blocks.len(), 4);
1599 for block_ref in linked_blocks {
1600 assert!(block_ref == round_2_block.reference() || block_ref.round == 1);
1601 }
1602
1603 for block in &all_blocks {
1605 if block.round() == 1 || block.reference() == round_2_block.reference() {
1606 assert!(dag_state.has_been_included(&block.reference()));
1607 } else {
1608 assert!(!dag_state.has_been_included(&block.reference()));
1609 }
1610 }
1611
1612 let round_6_block = all_blocks.last().unwrap();
1614 assert_eq!(round_6_block.round(), 6);
1615
1616 let last_commit = TrustedCommit::new_for_test(
1618 6,
1619 CommitDigest::MIN,
1620 context.clock.timestamp_utc_ms(),
1621 round_6_block.reference(),
1622 vec![],
1623 );
1624 dag_state.set_last_commit(last_commit);
1625 assert_eq!(
1626 dag_state.gc_round(),
1627 3,
1628 "GC round should have moved to round 3"
1629 );
1630
1631 let linked_blocks = dag_state.link_causal_history(round_6_block.reference());
1633
1634 assert_eq!(linked_blocks.len(), 7, "Linked blocks: {:?}", linked_blocks);
1636 for block_ref in linked_blocks {
1637 assert!(
1638 block_ref.round == 4
1639 || block_ref.round == 5
1640 || block_ref == round_6_block.reference()
1641 );
1642 }
1643
1644 for block in &all_blocks {
1646 let block_ref = block.reference();
1647 if block.round() == 1
1648 || block_ref == round_2_block.reference()
1649 || block_ref.round == 4
1650 || block_ref.round == 5
1651 || block_ref == round_6_block.reference()
1652 {
1653 assert!(dag_state.has_been_included(&block.reference()));
1654 } else {
1655 assert!(!dag_state.has_been_included(&block.reference()));
1656 }
1657 }
1658 }
1659
1660 #[tokio::test]
1661 async fn test_contains_blocks_in_cache_or_store() {
1662 const CACHED_ROUNDS: Round = 2;
1664
1665 let (mut context, _) = Context::new_for_test(4);
1666 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1667
1668 let context = Arc::new(context);
1669 let store = Arc::new(MemStore::new());
1670 let mut dag_state = DagState::new(context.clone(), store.clone());
1671
1672 let num_rounds: u32 = 10;
1674 let num_authorities: u32 = 4;
1675 let mut blocks = Vec::new();
1676
1677 for round in 1..=num_rounds {
1678 for author in 0..num_authorities {
1679 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1680 blocks.push(block);
1681 }
1682 }
1683
1684 blocks.clone().into_iter().for_each(|block| {
1686 if block.round() <= 4 {
1687 store
1688 .write(WriteBatch::default().blocks(vec![block]))
1689 .unwrap();
1690 } else {
1691 dag_state.accept_blocks(vec![block]);
1692 }
1693 });
1694
1695 let mut block_refs = blocks
1698 .iter()
1699 .map(|block| block.reference())
1700 .collect::<Vec<_>>();
1701 let result = dag_state.contains_blocks(block_refs.clone());
1702
1703 let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1705 assert_eq!(result, expected);
1706
1707 block_refs.insert(
1709 3,
1710 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1711 );
1712 let result = dag_state.contains_blocks(block_refs.clone());
1713
1714 expected.insert(3, false);
1716 assert_eq!(result, expected.clone());
1717 }
1718
1719 #[tokio::test]
1720 async fn test_contains_cached_block_at_slot() {
1721 const CACHED_ROUNDS: Round = 2;
1723
1724 let num_authorities: u32 = 4;
1725 let (mut context, _) = Context::new_for_test(num_authorities as usize);
1726 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1727
1728 let context = Arc::new(context);
1729 let store = Arc::new(MemStore::new());
1730 let mut dag_state = DagState::new(context.clone(), store.clone());
1731
1732 let num_rounds: u32 = 10;
1734 let mut blocks = Vec::new();
1735
1736 for round in 1..=num_rounds {
1737 for author in 0..num_authorities {
1738 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1739 blocks.push(block.clone());
1740 dag_state.accept_block(block);
1741 }
1742 }
1743
1744 for (author, _) in context.committee.authorities() {
1746 assert!(
1747 dag_state.contains_cached_block_at_slot(Slot::new(GENESIS_ROUND, author)),
1748 "Genesis should always be found"
1749 );
1750 }
1751
1752 let mut block_refs = blocks
1755 .iter()
1756 .map(|block| block.reference())
1757 .collect::<Vec<_>>();
1758
1759 for block_ref in block_refs.clone() {
1760 let slot = block_ref.into();
1761 let found = dag_state.contains_cached_block_at_slot(slot);
1762 assert!(found, "A block should be found at slot {}", slot);
1763 }
1764
1765 block_refs.insert(
1768 3,
1769 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1770 );
1771 let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1772 expected.insert(3, false);
1773
1774 for block_ref in block_refs {
1776 let slot = block_ref.into();
1777 let found = dag_state.contains_cached_block_at_slot(slot);
1778
1779 assert_eq!(expected.remove(0), found);
1780 }
1781 }
1782
1783 #[tokio::test]
1784 #[ignore]
1785 #[should_panic(
1786 expected = "Attempted to check for slot [1]3 that is <= the last gc evicted round 3"
1787 )]
1788 async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range() {
1789 const GC_DEPTH: u32 = 2;
1792 const CACHED_ROUNDS: Round = 3;
1794
1795 let (mut context, _) = Context::new_for_test(4);
1796 context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
1797 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1798
1799 let context = Arc::new(context);
1800 let store = Arc::new(MemStore::new());
1801 let mut dag_state = DagState::new(context.clone(), store.clone());
1802
1803 let mut dag_builder = DagBuilder::new(context.clone());
1805 dag_builder.layers(1..=3).build();
1806 dag_builder
1807 .layers(4..=6)
1808 .authorities(vec![AuthorityIndex::new_for_test(0)])
1809 .skip_block()
1810 .build();
1811
1812 dag_builder
1814 .all_blocks()
1815 .into_iter()
1816 .for_each(|block| dag_state.accept_block(block));
1817
1818 dag_state.add_commit(TrustedCommit::new_for_test(
1820 1 as CommitIndex,
1821 CommitDigest::MIN,
1822 0,
1823 dag_builder.leader_block(5).unwrap().reference(),
1824 vec![],
1825 ));
1826 dag_state.flush();
1828
1829 assert_eq!(dag_state.gc_round(), 3, "GC round should be 3");
1831
1832 for authority_index in 1..=3 {
1836 for round in 4..=6 {
1837 assert!(dag_state.contains_cached_block_at_slot(Slot::new(
1838 round,
1839 AuthorityIndex::new_for_test(authority_index)
1840 )));
1841 }
1842 }
1843
1844 for round in 1..=3 {
1845 assert!(
1846 dag_state.contains_cached_block_at_slot(Slot::new(
1847 round,
1848 AuthorityIndex::new_for_test(0)
1849 ))
1850 );
1851 }
1852
1853 let _ =
1856 dag_state.contains_cached_block_at_slot(Slot::new(3, AuthorityIndex::new_for_test(1)));
1857 }
1858
1859 #[tokio::test]
1860 async fn test_get_blocks_in_cache_or_store() {
1861 let (context, _) = Context::new_for_test(4);
1862 let context = Arc::new(context);
1863 let store = Arc::new(MemStore::new());
1864 let mut dag_state = DagState::new(context.clone(), store.clone());
1865
1866 let num_rounds: u32 = 10;
1868 let num_authorities: u32 = 4;
1869 let mut blocks = Vec::new();
1870
1871 for round in 1..=num_rounds {
1872 for author in 0..num_authorities {
1873 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1874 blocks.push(block);
1875 }
1876 }
1877
1878 blocks.clone().into_iter().for_each(|block| {
1880 if block.round() <= 4 {
1881 store
1882 .write(WriteBatch::default().blocks(vec![block]))
1883 .unwrap();
1884 } else {
1885 dag_state.accept_blocks(vec![block]);
1886 }
1887 });
1888
1889 let mut block_refs = blocks
1892 .iter()
1893 .map(|block| block.reference())
1894 .collect::<Vec<_>>();
1895 let result = dag_state.get_blocks(&block_refs);
1896
1897 let mut expected = blocks
1898 .into_iter()
1899 .map(Some)
1900 .collect::<Vec<Option<VerifiedBlock>>>();
1901
1902 assert_eq!(result, expected.clone());
1904
1905 block_refs.insert(
1907 3,
1908 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1909 );
1910 let result = dag_state.get_blocks(&block_refs);
1911
1912 expected.insert(3, None);
1914 assert_eq!(result, expected);
1915 }
1916
1917 #[tokio::test]
1918 async fn test_flush_and_recovery() {
1919 telemetry_subscribers::init_for_testing();
1920
1921 const GC_DEPTH: u32 = 3;
1922 const CACHED_ROUNDS: u32 = 4;
1923
1924 let num_authorities: u32 = 4;
1925 let (mut context, _) = Context::new_for_test(num_authorities as usize);
1926 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1927 context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
1928
1929 let context = Arc::new(context);
1930
1931 let store = Arc::new(MemStore::new());
1932 let mut dag_state = DagState::new(context.clone(), store.clone());
1933
1934 const NUM_ROUNDS: Round = 20;
1935 let mut dag_builder = DagBuilder::new(context.clone());
1936 dag_builder.layers(1..=5).build();
1937 dag_builder
1938 .layers(6..=8)
1939 .authorities(vec![AuthorityIndex::new_for_test(0)])
1940 .skip_block()
1941 .build();
1942 dag_builder.layers(9..=NUM_ROUNDS).build();
1943
1944 const LAST_COMMIT_ROUND: Round = 16;
1946 const LAST_COMMIT_INDEX: CommitIndex = 15;
1947 let commits = dag_builder
1948 .get_sub_dag_and_commits(1..=NUM_ROUNDS)
1949 .into_iter()
1950 .map(|(_subdag, commit)| commit)
1951 .take(LAST_COMMIT_INDEX as usize)
1952 .collect::<Vec<_>>();
1953 assert_eq!(commits.len(), LAST_COMMIT_INDEX as usize);
1954 assert_eq!(commits.last().unwrap().round(), LAST_COMMIT_ROUND);
1955
1956 const PERSISTED_BLOCK_ROUNDS: u32 = 12;
1960 const NUM_PERSISTED_COMMITS: usize = 8;
1961 const LAST_PERSISTED_COMMIT_ROUND: Round = 9;
1962 const LAST_PERSISTED_COMMIT_INDEX: CommitIndex = 8;
1963 dag_state.accept_blocks(dag_builder.blocks(1..=PERSISTED_BLOCK_ROUNDS));
1964 let mut finalized_commits = vec![];
1965 for commit in commits.iter().take(NUM_PERSISTED_COMMITS).cloned() {
1966 finalized_commits.push(commit.clone());
1967 dag_state.add_commit(commit);
1968 }
1969 let last_finalized_commit = finalized_commits.last().unwrap();
1970 assert_eq!(last_finalized_commit.round(), LAST_PERSISTED_COMMIT_ROUND);
1971 assert_eq!(last_finalized_commit.index(), LAST_PERSISTED_COMMIT_INDEX);
1972
1973 let finalized_blocks = finalized_commits
1975 .iter()
1976 .flat_map(|commit| commit.blocks())
1977 .collect::<BTreeSet<_>>();
1978
1979 dag_state.flush();
1981
1982 let store_blocks = store
1984 .scan_blocks_by_author(AuthorityIndex::new_for_test(1), 1)
1985 .unwrap();
1986 assert_eq!(store_blocks.last().unwrap().round(), PERSISTED_BLOCK_ROUNDS);
1987 let store_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1988 assert_eq!(store_commits.len(), NUM_PERSISTED_COMMITS);
1989 assert_eq!(
1990 store_commits.last().unwrap().index(),
1991 LAST_PERSISTED_COMMIT_INDEX
1992 );
1993 assert_eq!(
1994 store_commits.last().unwrap().round(),
1995 LAST_PERSISTED_COMMIT_ROUND
1996 );
1997
1998 dag_state.accept_blocks(dag_builder.blocks(PERSISTED_BLOCK_ROUNDS + 1..=NUM_ROUNDS));
2000 for commit in commits.iter().skip(NUM_PERSISTED_COMMITS).cloned() {
2001 dag_state.add_commit(commit);
2002 }
2003
2004 let all_blocks = dag_builder.blocks(1..=NUM_ROUNDS);
2006 let block_refs = all_blocks
2007 .iter()
2008 .map(|block| block.reference())
2009 .collect::<Vec<_>>();
2010 let result = dag_state
2011 .get_blocks(&block_refs)
2012 .into_iter()
2013 .map(|b| b.unwrap())
2014 .collect::<Vec<_>>();
2015 assert_eq!(result, all_blocks);
2016
2017 assert_eq!(dag_state.last_commit_index(), LAST_COMMIT_INDEX);
2019
2020 drop(dag_state);
2022
2023 let dag_state = DagState::new(context.clone(), store.clone());
2025
2026 let all_blocks = dag_builder.blocks(1..=PERSISTED_BLOCK_ROUNDS);
2028 let block_refs = all_blocks
2029 .iter()
2030 .map(|block| block.reference())
2031 .collect::<Vec<_>>();
2032 let result = dag_state
2033 .get_blocks(&block_refs)
2034 .into_iter()
2035 .map(|b| b.unwrap())
2036 .collect::<Vec<_>>();
2037 assert_eq!(result, all_blocks);
2038
2039 let missing_blocks = dag_builder.blocks(PERSISTED_BLOCK_ROUNDS + 1..=NUM_ROUNDS);
2041 let block_refs = missing_blocks
2042 .iter()
2043 .map(|block| block.reference())
2044 .collect::<Vec<_>>();
2045 let retrieved_blocks = dag_state
2046 .get_blocks(&block_refs)
2047 .into_iter()
2048 .flatten()
2049 .collect::<Vec<_>>();
2050 assert!(retrieved_blocks.is_empty());
2051
2052 assert_eq!(dag_state.last_commit_index(), LAST_PERSISTED_COMMIT_INDEX);
2054 assert_eq!(dag_state.last_commit_round(), LAST_PERSISTED_COMMIT_ROUND);
2055
2056 let expected_last_committed_rounds = vec![5, 9, 8, 8];
2058 assert_eq!(
2059 dag_state.last_committed_rounds(),
2060 expected_last_committed_rounds
2061 );
2062 assert_eq!(dag_state.scoring_subdags_count(), NUM_PERSISTED_COMMITS);
2064
2065 for (authority_index, _) in context.committee.authorities() {
2067 let blocks = dag_state.get_cached_blocks(authority_index, 1);
2068
2069 if authority_index == AuthorityIndex::new_for_test(0) {
2073 assert_eq!(blocks.len(), 4);
2074 assert_eq!(dag_state.evicted_rounds[authority_index.value()], 6);
2075 assert!(
2076 blocks
2077 .into_iter()
2078 .all(|block| block.round() >= 7 && block.round() <= 12)
2079 );
2080 } else {
2081 assert_eq!(blocks.len(), 6);
2082 assert_eq!(dag_state.evicted_rounds[authority_index.value()], 6);
2083 assert!(
2084 blocks
2085 .into_iter()
2086 .all(|block| block.round() >= 7 && block.round() <= 12)
2087 );
2088 }
2089 }
2090
2091 let gc_round = dag_state.gc_round();
2093 assert_eq!(gc_round, 6);
2094 dag_state
2095 .recent_blocks
2096 .iter()
2097 .for_each(|(block_ref, block_info)| {
2098 if block_ref.round > gc_round && finalized_blocks.contains(block_ref) {
2099 assert!(
2100 block_info.committed,
2101 "Block {:?} should be set as committed",
2102 block_ref
2103 );
2104 }
2105 });
2106
2107 dag_state
2112 .recent_blocks
2113 .iter()
2114 .for_each(|(block_ref, block_info)| {
2115 if block_ref.round < PERSISTED_BLOCK_ROUNDS || block_ref.author.value() == 0 {
2116 assert!(block_info.included);
2117 } else {
2118 assert!(!block_info.included);
2119 }
2120 });
2121 }
2122
2123 #[tokio::test]
2124 async fn test_block_info_as_committed() {
2125 let num_authorities: u32 = 4;
2126 let (context, _) = Context::new_for_test(num_authorities as usize);
2127 let context = Arc::new(context);
2128
2129 let store = Arc::new(MemStore::new());
2130 let mut dag_state = DagState::new(context.clone(), store.clone());
2131
2132 let block = VerifiedBlock::new_for_test(
2134 TestBlock::new(1, 0)
2135 .set_timestamp_ms(1000)
2136 .set_ancestors(vec![])
2137 .build(),
2138 );
2139
2140 dag_state.accept_block(block.clone());
2141
2142 assert!(!dag_state.is_committed(&block.reference()));
2144
2145 assert!(
2147 dag_state.set_committed(&block.reference()),
2148 "Block should be successfully set as committed for first time"
2149 );
2150
2151 assert!(dag_state.is_committed(&block.reference()));
2153
2154 assert!(
2156 !dag_state.set_committed(&block.reference()),
2157 "Block should not be successfully set as committed"
2158 );
2159 }
2160
2161 #[tokio::test]
2162 async fn test_get_cached_blocks() {
2163 let (mut context, _) = Context::new_for_test(4);
2164 context.parameters.dag_state_cached_rounds = 5;
2165
2166 let context = Arc::new(context);
2167 let store = Arc::new(MemStore::new());
2168 let mut dag_state = DagState::new(context.clone(), store.clone());
2169
2170 let mut all_blocks = Vec::new();
2175 for author in 1..=3 {
2176 for round in 10..(10 + author) {
2177 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2178 all_blocks.push(block.clone());
2179 dag_state.accept_block(block);
2180 }
2181 }
2182
2183 let cached_blocks =
2186 dag_state.get_cached_blocks(context.committee.to_authority_index(0).unwrap(), 0);
2187 assert!(cached_blocks.is_empty());
2188
2189 let cached_blocks =
2190 dag_state.get_cached_blocks(context.committee.to_authority_index(1).unwrap(), 10);
2191 assert_eq!(cached_blocks.len(), 1);
2192 assert_eq!(cached_blocks[0].round(), 10);
2193
2194 let cached_blocks =
2195 dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 10);
2196 assert_eq!(cached_blocks.len(), 2);
2197 assert_eq!(cached_blocks[0].round(), 10);
2198 assert_eq!(cached_blocks[1].round(), 11);
2199
2200 let cached_blocks =
2201 dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 11);
2202 assert_eq!(cached_blocks.len(), 1);
2203 assert_eq!(cached_blocks[0].round(), 11);
2204
2205 let cached_blocks =
2206 dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 10);
2207 assert_eq!(cached_blocks.len(), 3);
2208 assert_eq!(cached_blocks[0].round(), 10);
2209 assert_eq!(cached_blocks[1].round(), 11);
2210 assert_eq!(cached_blocks[2].round(), 12);
2211
2212 let cached_blocks =
2213 dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 12);
2214 assert_eq!(cached_blocks.len(), 1);
2215 assert_eq!(cached_blocks[0].round(), 12);
2216
2217 let cached_blocks = dag_state.get_cached_blocks_in_range(
2221 context.committee.to_authority_index(3).unwrap(),
2222 10,
2223 10,
2224 1,
2225 );
2226 assert!(cached_blocks.is_empty());
2227
2228 let cached_blocks = dag_state.get_cached_blocks_in_range(
2230 context.committee.to_authority_index(3).unwrap(),
2231 11,
2232 10,
2233 1,
2234 );
2235 assert!(cached_blocks.is_empty());
2236
2237 let cached_blocks = dag_state.get_cached_blocks_in_range(
2239 context.committee.to_authority_index(0).unwrap(),
2240 9,
2241 10,
2242 1,
2243 );
2244 assert!(cached_blocks.is_empty());
2245
2246 let cached_blocks = dag_state.get_cached_blocks_in_range(
2248 context.committee.to_authority_index(1).unwrap(),
2249 9,
2250 11,
2251 1,
2252 );
2253 assert_eq!(cached_blocks.len(), 1);
2254 assert_eq!(cached_blocks[0].round(), 10);
2255
2256 let cached_blocks = dag_state.get_cached_blocks_in_range(
2258 context.committee.to_authority_index(2).unwrap(),
2259 9,
2260 12,
2261 5,
2262 );
2263 assert_eq!(cached_blocks.len(), 2);
2264 assert_eq!(cached_blocks[0].round(), 10);
2265 assert_eq!(cached_blocks[1].round(), 11);
2266
2267 let cached_blocks = dag_state.get_cached_blocks_in_range(
2269 context.committee.to_authority_index(3).unwrap(),
2270 11,
2271 20,
2272 5,
2273 );
2274 assert_eq!(cached_blocks.len(), 2);
2275 assert_eq!(cached_blocks[0].round(), 11);
2276 assert_eq!(cached_blocks[1].round(), 12);
2277
2278 let cached_blocks = dag_state.get_cached_blocks_in_range(
2280 context.committee.to_authority_index(3).unwrap(),
2281 10,
2282 20,
2283 1,
2284 );
2285 assert_eq!(cached_blocks.len(), 1);
2286 assert_eq!(cached_blocks[0].round(), 10);
2287 }
2288
2289 #[tokio::test]
2290 async fn test_get_last_cached_block() {
2291 const CACHED_ROUNDS: Round = 2;
2293 const GC_DEPTH: u32 = 1;
2294 let (mut context, _) = Context::new_for_test(4);
2295 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2296 context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
2297
2298 let context = Arc::new(context);
2299 let store = Arc::new(MemStore::new());
2300 let mut dag_state = DagState::new(context.clone(), store.clone());
2301
2302 let dag_str = "DAG {
2307 Round 0 : { 4 },
2308 Round 1 : {
2309 B -> [*],
2310 C -> [*],
2311 D -> [*],
2312 },
2313 Round 2 : {
2314 C -> [*],
2315 D -> [*],
2316 },
2317 Round 3 : {
2318 D -> [*],
2319 },
2320 }";
2321
2322 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2323
2324 let block = VerifiedBlock::new_for_test(TestBlock::new(2, 2).build());
2326
2327 for block in dag_builder
2329 .all_blocks()
2330 .into_iter()
2331 .chain(std::iter::once(block))
2332 {
2333 dag_state.accept_block(block);
2334 }
2335
2336 dag_state.add_commit(TrustedCommit::new_for_test(
2337 1 as CommitIndex,
2338 CommitDigest::MIN,
2339 context.clock.timestamp_utc_ms(),
2340 dag_builder.leader_block(3).unwrap().reference(),
2341 vec![],
2342 ));
2343
2344 let end_round = 4;
2346 let expected_rounds = vec![0, 1, 2, 3];
2347 let expected_excluded_and_equivocating_blocks = vec![0, 0, 1, 0];
2348 let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2350 assert_eq!(
2351 last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2352 expected_rounds
2353 );
2354 assert_eq!(
2355 last_blocks.iter().map(|b| b.1.len()).collect::<Vec<_>>(),
2356 expected_excluded_and_equivocating_blocks
2357 );
2358
2359 for (i, expected_round) in expected_rounds.iter().enumerate() {
2361 let round = dag_state
2362 .get_last_cached_block_in_range(
2363 context.committee.to_authority_index(i).unwrap(),
2364 0,
2365 end_round,
2366 )
2367 .map(|b| b.round())
2368 .unwrap_or_default();
2369 assert_eq!(round, *expected_round, "Authority {i}");
2370 }
2371
2372 let start_round = 2;
2374 let expected_rounds = [0, 0, 2, 3];
2375
2376 for (i, expected_round) in expected_rounds.iter().enumerate() {
2378 let round = dag_state
2379 .get_last_cached_block_in_range(
2380 context.committee.to_authority_index(i).unwrap(),
2381 start_round,
2382 end_round,
2383 )
2384 .map(|b| b.round())
2385 .unwrap_or_default();
2386 assert_eq!(round, *expected_round, "Authority {i}");
2387 }
2388
2389 dag_state.flush();
2395
2396 let end_round = 3;
2398 let expected_rounds = vec![0, 1, 2, 2];
2399
2400 let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2402 assert_eq!(
2403 last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2404 expected_rounds
2405 );
2406
2407 for (i, expected_round) in expected_rounds.iter().enumerate() {
2409 let round = dag_state
2410 .get_last_cached_block_in_range(
2411 context.committee.to_authority_index(i).unwrap(),
2412 0,
2413 end_round,
2414 )
2415 .map(|b| b.round())
2416 .unwrap_or_default();
2417 assert_eq!(round, *expected_round, "Authority {i}");
2418 }
2419 }
2420
2421 #[tokio::test]
2422 #[should_panic(
2423 expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
2424 )]
2425 async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() {
2426 const CACHED_ROUNDS: Round = 1;
2428 const GC_DEPTH: u32 = 1;
2429 let (mut context, _) = Context::new_for_test(4);
2430 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2431 context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
2432
2433 let context = Arc::new(context);
2434 let store = Arc::new(MemStore::new());
2435 let mut dag_state = DagState::new(context.clone(), store.clone());
2436
2437 let mut dag_builder = DagBuilder::new(context.clone());
2442 dag_builder
2443 .layers(1..=1)
2444 .authorities(vec![AuthorityIndex::new_for_test(0)])
2445 .skip_block()
2446 .build();
2447 dag_builder
2448 .layers(2..=2)
2449 .authorities(vec![
2450 AuthorityIndex::new_for_test(0),
2451 AuthorityIndex::new_for_test(1),
2452 ])
2453 .skip_block()
2454 .build();
2455 dag_builder
2456 .layers(3..=3)
2457 .authorities(vec![
2458 AuthorityIndex::new_for_test(0),
2459 AuthorityIndex::new_for_test(1),
2460 AuthorityIndex::new_for_test(2),
2461 ])
2462 .skip_block()
2463 .build();
2464
2465 for block in dag_builder.all_blocks() {
2467 dag_state.accept_block(block);
2468 }
2469
2470 dag_state.add_commit(TrustedCommit::new_for_test(
2471 1 as CommitIndex,
2472 CommitDigest::MIN,
2473 0,
2474 dag_builder.leader_block(3).unwrap().reference(),
2475 vec![],
2476 ));
2477
2478 dag_state.flush();
2480
2481 dag_state.get_last_cached_block_per_authority(2);
2483 }
2484
2485 #[tokio::test]
2486 async fn test_last_quorum() {
2487 let (context, _) = Context::new_for_test(4);
2489 let context = Arc::new(context);
2490 let store = Arc::new(MemStore::new());
2491 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2492
2493 {
2495 let genesis = genesis_blocks(context.as_ref());
2496
2497 assert_eq!(dag_state.read().last_quorum(), genesis);
2498 }
2499
2500 {
2502 let mut dag_builder = DagBuilder::new(context.clone());
2503 dag_builder
2504 .layers(1..=4)
2505 .build()
2506 .persist_layers(dag_state.clone());
2507 let round_4_blocks: Vec<_> = dag_builder
2508 .blocks(4..=4)
2509 .into_iter()
2510 .map(|block| block.reference())
2511 .collect();
2512
2513 let last_quorum = dag_state.read().last_quorum();
2514
2515 assert_eq!(
2516 last_quorum
2517 .into_iter()
2518 .map(|block| block.reference())
2519 .collect::<Vec<_>>(),
2520 round_4_blocks
2521 );
2522 }
2523
2524 {
2526 let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2527 dag_state.write().accept_block(block);
2528
2529 let round_4_blocks = dag_state.read().get_uncommitted_blocks_at_round(4);
2530
2531 let last_quorum = dag_state.read().last_quorum();
2532
2533 assert_eq!(last_quorum, round_4_blocks);
2534 }
2535 }
2536
2537 #[tokio::test]
2538 async fn test_last_block_for_authority() {
2539 let (context, _) = Context::new_for_test(4);
2541 let context = Arc::new(context);
2542 let store = Arc::new(MemStore::new());
2543 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2544
2545 {
2547 let genesis = genesis_blocks(context.as_ref());
2548 let my_genesis = genesis
2549 .into_iter()
2550 .find(|block| block.author() == context.own_index)
2551 .unwrap();
2552
2553 assert_eq!(dag_state.read().get_last_proposed_block(), my_genesis);
2554 }
2555
2556 {
2558 let mut dag_builder = DagBuilder::new(context.clone());
2560 dag_builder
2561 .layers(1..=4)
2562 .build()
2563 .persist_layers(dag_state.clone());
2564
2565 let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2567 dag_state.write().accept_block(block);
2568
2569 let block = dag_state
2570 .read()
2571 .get_last_block_for_authority(AuthorityIndex::new_for_test(0));
2572 assert_eq!(block.round(), 5);
2573
2574 for (authority_index, _) in context.committee.authorities() {
2575 let block = dag_state
2576 .read()
2577 .get_last_block_for_authority(authority_index);
2578
2579 if authority_index.value() == 0 {
2580 assert_eq!(block.round(), 5);
2581 } else {
2582 assert_eq!(block.round(), 4);
2583 }
2584 }
2585 }
2586 }
2587
2588 #[tokio::test]
2589 async fn test_accept_block_not_panics_when_timestamp_is_ahead_and_median_timestamp() {
2590 let (context, _) = Context::new_for_test(4);
2592 let context = Arc::new(context);
2593 let store = Arc::new(MemStore::new());
2594 let mut dag_state = DagState::new(context.clone(), store.clone());
2595
2596 let block_timestamp = context.clock.timestamp_utc_ms() + 5_000;
2598
2599 let block = VerifiedBlock::new_for_test(
2600 TestBlock::new(10, 0)
2601 .set_timestamp_ms(block_timestamp)
2602 .build(),
2603 );
2604
2605 dag_state.accept_block(block);
2607 }
2608
2609 #[tokio::test]
2610 async fn test_last_finalized_commit() {
2611 let (context, _) = Context::new_for_test(4);
2613 let context = Arc::new(context);
2614 let store = Arc::new(MemStore::new());
2615 let mut dag_state = DagState::new(context.clone(), store.clone());
2616
2617 let commit_ref = CommitRef::new(1, CommitDigest::MIN);
2619 let rejected_transactions = BTreeMap::new();
2620 dag_state.add_finalized_commit(commit_ref, rejected_transactions.clone());
2621
2622 assert_eq!(dag_state.finalized_commits_to_write.len(), 1);
2624 assert_eq!(
2625 dag_state.finalized_commits_to_write[0],
2626 (commit_ref, rejected_transactions.clone())
2627 );
2628
2629 dag_state.flush();
2631
2632 let last_finalized_commit = store.read_last_finalized_commit().unwrap();
2634 assert_eq!(last_finalized_commit, Some(commit_ref));
2635 let stored_rejected_transactions = store
2636 .read_rejected_transactions(commit_ref)
2637 .unwrap()
2638 .unwrap();
2639 assert_eq!(stored_rejected_transactions, rejected_transactions);
2640 }
2641}