1use std::{
5 cmp::max,
6 collections::{BTreeMap, BTreeSet, VecDeque},
7 ops::Bound::{Excluded, Included, Unbounded},
8 panic,
9 sync::Arc,
10 time::Duration,
11 vec,
12};
13
14use consensus_config::AuthorityIndex;
15use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs, Round, TransactionIndex};
16use itertools::Itertools as _;
17use tokio::time::Instant;
18use tracing::{debug, error, info, trace};
19
20use crate::{
21 CommittedSubDag,
22 block::{BlockAPI, GENESIS_ROUND, Slot, VerifiedBlock, genesis_blocks},
23 commit::{
24 CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRef, CommitVote,
25 GENESIS_COMMIT_INDEX, TrustedCommit, load_committed_subdag_from_store,
26 },
27 context::Context,
28 leader_scoring::{ReputationScores, ScoringSubdag},
29 storage::{Store, WriteBatch},
30 threshold_clock::ThresholdClock,
31};
32
33pub struct DagState {
41 context: Arc<Context>,
42
43 genesis: BTreeMap<BlockRef, VerifiedBlock>,
45
46 recent_blocks: BTreeMap<BlockRef, BlockInfo>,
54
55 recent_refs_by_authority: Vec<BTreeSet<BlockRef>>,
58
59 threshold_clock: ThresholdClock,
61
62 evicted_rounds: Vec<Round>,
66
67 highest_accepted_round: Round,
69
70 last_commit: Option<TrustedCommit>,
72
73 last_commit_round_advancement_time: Option<std::time::Instant>,
75
76 last_committed_rounds: Vec<Round>,
78
79 scoring_subdag: ScoringSubdag,
82
83 pending_commit_votes: VecDeque<CommitVote>,
87
88 blocks_to_write: Vec<VerifiedBlock>,
91 commits_to_write: Vec<TrustedCommit>,
92
93 commit_info_to_write: Vec<(CommitRef, CommitInfo)>,
97
98 finalized_commits_to_write: Vec<(CommitRef, BTreeMap<BlockRef, Vec<TransactionIndex>>)>,
100
101 store: Arc<dyn Store>,
103
104 cached_rounds: Round,
106}
107
108impl DagState {
109 pub fn new(context: Arc<Context>, store: Arc<dyn Store>) -> Self {
111 let cached_rounds = context.parameters.dag_state_cached_rounds as Round;
112 let num_authorities = context.committee.size();
113
114 let genesis = genesis_blocks(context.as_ref())
115 .into_iter()
116 .map(|block| (block.reference(), block))
117 .collect();
118
119 let threshold_clock = ThresholdClock::new(1, context.clone());
120
121 let last_commit = store
122 .read_last_commit()
123 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
124
125 let commit_info = store
126 .read_last_commit_info()
127 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
128 let (mut last_committed_rounds, commit_recovery_start_index) =
129 if let Some((commit_ref, commit_info)) = commit_info {
130 tracing::info!("Recovering committed state from {commit_ref} {commit_info:?}");
131 (commit_info.committed_rounds, commit_ref.index + 1)
132 } else {
133 tracing::info!("Found no stored CommitInfo to recover from");
134 (vec![0; num_authorities], GENESIS_COMMIT_INDEX + 1)
135 };
136
137 let mut unscored_committed_subdags = Vec::new();
138 let mut scoring_subdag = ScoringSubdag::new(context.clone());
139
140 if let Some(last_commit) = last_commit.as_ref() {
141 store
142 .scan_commits((commit_recovery_start_index..=last_commit.index()).into())
143 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
144 .iter()
145 .for_each(|commit| {
146 for block_ref in commit.blocks() {
147 last_committed_rounds[block_ref.author] =
148 max(last_committed_rounds[block_ref.author], block_ref.round);
149 }
150 let committed_subdag =
151 load_committed_subdag_from_store(store.as_ref(), commit.clone(), vec![]);
152 unscored_committed_subdags.push(committed_subdag);
154 });
155 }
156
157 tracing::info!(
158 "DagState was initialized with the following state: \
159 {last_commit:?}; {last_committed_rounds:?}; {} unscored committed subdags;",
160 unscored_committed_subdags.len()
161 );
162
163 scoring_subdag.add_subdags(std::mem::take(&mut unscored_committed_subdags));
164
165 let mut state = Self {
166 context: context.clone(),
167 genesis,
168 recent_blocks: BTreeMap::new(),
169 recent_refs_by_authority: vec![BTreeSet::new(); num_authorities],
170 threshold_clock,
171 highest_accepted_round: 0,
172 last_commit: last_commit.clone(),
173 last_commit_round_advancement_time: None,
174 last_committed_rounds: last_committed_rounds.clone(),
175 pending_commit_votes: VecDeque::new(),
176 blocks_to_write: vec![],
177 commits_to_write: vec![],
178 commit_info_to_write: vec![],
179 finalized_commits_to_write: vec![],
180 scoring_subdag,
181 store: store.clone(),
182 cached_rounds,
183 evicted_rounds: vec![0; num_authorities],
184 };
185
186 for (authority_index, _) in context.committee.authorities() {
187 let (blocks, eviction_round) = {
188 let last_block = state
191 .store
192 .scan_last_blocks_by_author(authority_index, 1, None)
193 .expect("Database error");
194 let last_block_round = last_block
195 .last()
196 .map(|b| b.round())
197 .unwrap_or(GENESIS_ROUND);
198
199 let eviction_round =
200 Self::eviction_round(last_block_round, state.gc_round(), state.cached_rounds);
201 let blocks = state
202 .store
203 .scan_blocks_by_author(authority_index, eviction_round + 1)
204 .expect("Database error");
205
206 (blocks, eviction_round)
207 };
208
209 state.evicted_rounds[authority_index] = eviction_round;
210
211 for block in &blocks {
213 state.update_block_metadata(block);
214 }
215
216 debug!(
217 "Recovered blocks {}: {:?}",
218 authority_index,
219 blocks
220 .iter()
221 .map(|b| b.reference())
222 .collect::<Vec<BlockRef>>()
223 );
224 }
225
226 if let Some(last_commit) = last_commit {
227 let mut index = last_commit.index();
228 let gc_round = state.gc_round();
229 info!(
230 "Recovering block commit statuses from commit index {} and backwards until leader of round <= gc_round {:?}",
231 index, gc_round
232 );
233
234 loop {
235 let commits = store
236 .scan_commits((index..=index).into())
237 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
238 let Some(commit) = commits.first() else {
239 info!("Recovering finished up to index {index}, no more commits to recover");
240 break;
241 };
242
243 if gc_round > 0 && commit.leader().round <= gc_round {
245 info!(
246 "Recovering finished, reached commit leader round {} <= gc_round {}",
247 commit.leader().round,
248 gc_round
249 );
250 break;
251 }
252
253 commit.blocks().iter().filter(|b| b.round > gc_round).for_each(|block_ref|{
254 debug!(
255 "Setting block {:?} as committed based on commit {:?}",
256 block_ref,
257 commit.index()
258 );
259 assert!(state.set_committed(block_ref), "Attempted to set again a block {:?} as committed when recovering commit {:?}", block_ref, commit);
260 });
261
262 index = index.saturating_sub(1);
264 if index == 0 {
265 break;
266 }
267 }
268 }
269
270 let proposed_blocks = store
272 .scan_blocks_by_author(context.own_index, state.gc_round() + 1)
273 .expect("Database error");
274 for block in proposed_blocks {
275 state.link_causal_history(block.reference());
276 }
277
278 state
279 }
280
281 pub(crate) fn accept_block(&mut self, block: VerifiedBlock) {
283 assert_ne!(
284 block.round(),
285 0,
286 "Genesis block should not be accepted into DAG."
287 );
288
289 let block_ref = block.reference();
290 if self.contains_block(&block_ref) {
291 return;
292 }
293
294 let now = self.context.clock.timestamp_utc_ms();
295 if block.timestamp_ms() > now {
296 trace!(
297 "Block {:?} with timestamp {} is greater than local timestamp {}.",
298 block,
299 block.timestamp_ms(),
300 now,
301 );
302 }
303 let hostname = &self.context.committee.authority(block_ref.author).hostname;
304 self.context
305 .metrics
306 .node_metrics
307 .accepted_block_time_drift_ms
308 .with_label_values(&[hostname])
309 .inc_by(block.timestamp_ms().saturating_sub(now));
310
311 if block_ref.author == self.context.own_index {
314 let existing_blocks = self.get_uncommitted_blocks_at_slot(block_ref.into());
315 assert!(
316 existing_blocks.is_empty(),
317 "Block Rejected! Attempted to add block {block:#?} to own slot where \
318 block(s) {existing_blocks:#?} already exists."
319 );
320 }
321 self.update_block_metadata(&block);
322 self.blocks_to_write.push(block);
323 let source = if self.context.own_index == block_ref.author {
324 "own"
325 } else {
326 "others"
327 };
328 self.context
329 .metrics
330 .node_metrics
331 .accepted_blocks
332 .with_label_values(&[source])
333 .inc();
334 }
335
336 fn update_block_metadata(&mut self, block: &VerifiedBlock) {
338 let block_ref = block.reference();
339 self.recent_blocks
340 .insert(block_ref, BlockInfo::new(block.clone()));
341 self.recent_refs_by_authority[block_ref.author].insert(block_ref);
342
343 if self.threshold_clock.add_block(block_ref) {
344 let last_proposed_block = self.get_last_proposed_block();
346 if last_proposed_block.round() == block_ref.round {
347 let quorum_delay_ms = self
348 .context
349 .clock
350 .timestamp_utc_ms()
351 .saturating_sub(self.get_last_proposed_block().timestamp_ms());
352 self.context
353 .metrics
354 .node_metrics
355 .quorum_receive_latency
356 .observe(Duration::from_millis(quorum_delay_ms).as_secs_f64());
357 }
358 }
359
360 self.highest_accepted_round = max(self.highest_accepted_round, block.round());
361 self.context
362 .metrics
363 .node_metrics
364 .highest_accepted_round
365 .set(self.highest_accepted_round as i64);
366
367 let highest_accepted_round_for_author = self.recent_refs_by_authority[block_ref.author]
368 .last()
369 .map(|block_ref| block_ref.round)
370 .expect("There should be by now at least one block ref");
371 let hostname = &self.context.committee.authority(block_ref.author).hostname;
372 self.context
373 .metrics
374 .node_metrics
375 .highest_accepted_authority_round
376 .with_label_values(&[hostname])
377 .set(highest_accepted_round_for_author as i64);
378 }
379
380 pub(crate) fn accept_blocks(&mut self, blocks: Vec<VerifiedBlock>) {
382 debug!(
383 "Accepting blocks: {}",
384 blocks.iter().map(|b| b.reference().to_string()).join(",")
385 );
386 for block in blocks {
387 self.accept_block(block);
388 }
389 }
390
391 pub(crate) fn get_block(&self, reference: &BlockRef) -> Option<VerifiedBlock> {
394 self.get_blocks(&[*reference])
395 .pop()
396 .expect("Exactly one element should be returned")
397 }
398
399 pub(crate) fn get_blocks(&self, block_refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
402 let mut blocks = vec![None; block_refs.len()];
403 let mut missing = Vec::new();
404
405 for (index, block_ref) in block_refs.iter().enumerate() {
406 if block_ref.round == GENESIS_ROUND {
407 if let Some(block) = self.genesis.get(block_ref) {
409 blocks[index] = Some(block.clone());
410 }
411 continue;
412 }
413 if let Some(block_info) = self.recent_blocks.get(block_ref) {
414 blocks[index] = Some(block_info.block.clone());
415 continue;
416 }
417 missing.push((index, block_ref));
418 }
419
420 if missing.is_empty() {
421 return blocks;
422 }
423
424 let missing_refs = missing
425 .iter()
426 .map(|(_, block_ref)| **block_ref)
427 .collect::<Vec<_>>();
428 let store_results = self
429 .store
430 .read_blocks(&missing_refs)
431 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
432 self.context
433 .metrics
434 .node_metrics
435 .dag_state_store_read_count
436 .with_label_values(&["get_blocks"])
437 .inc();
438
439 for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
440 blocks[index] = result;
441 }
442
443 blocks
444 }
445
446 pub(crate) fn get_uncommitted_blocks_at_slot(&self, slot: Slot) -> Vec<VerifiedBlock> {
449 let mut blocks = vec![];
453 for (_block_ref, block_info) in self.recent_blocks.range((
454 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
455 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
456 )) {
457 blocks.push(block_info.block.clone())
458 }
459 blocks
460 }
461
462 pub(crate) fn get_uncommitted_blocks_at_round(&self, round: Round) -> Vec<VerifiedBlock> {
465 if round <= self.last_commit_round() {
466 panic!("Round {} have committed blocks!", round);
467 }
468
469 let mut blocks = vec![];
470 for (_block_ref, block_info) in self.recent_blocks.range((
471 Included(BlockRef::new(round, AuthorityIndex::ZERO, BlockDigest::MIN)),
472 Excluded(BlockRef::new(
473 round + 1,
474 AuthorityIndex::ZERO,
475 BlockDigest::MIN,
476 )),
477 )) {
478 blocks.push(block_info.block.clone())
479 }
480 blocks
481 }
482
483 pub(crate) fn ancestors_at_round(
485 &self,
486 later_block: &VerifiedBlock,
487 earlier_round: Round,
488 ) -> Vec<VerifiedBlock> {
489 let mut linked: BTreeSet<BlockRef> = later_block.ancestors().iter().cloned().collect();
491 while !linked.is_empty() {
492 let round = linked.last().unwrap().round;
493 if round <= earlier_round {
495 break;
496 }
497 let block_ref = linked.pop_last().unwrap();
498 let Some(block) = self.get_block(&block_ref) else {
499 panic!("Block {:?} should exist in DAG!", block_ref);
500 };
501 linked.extend(block.ancestors().iter().cloned());
502 }
503 linked
504 .range((
505 Included(BlockRef::new(
506 earlier_round,
507 AuthorityIndex::ZERO,
508 BlockDigest::MIN,
509 )),
510 Unbounded,
511 ))
512 .map(|r| {
513 self.get_block(r)
514 .unwrap_or_else(|| panic!("Block {:?} should exist in DAG!", r))
515 .clone()
516 })
517 .collect()
518 }
519
520 pub(crate) fn get_last_proposed_block(&self) -> VerifiedBlock {
523 self.get_last_block_for_authority(self.context.own_index)
524 }
525
526 pub(crate) fn get_last_block_for_authority(&self, authority: AuthorityIndex) -> VerifiedBlock {
529 if let Some(last) = self.recent_refs_by_authority[authority].last() {
530 return self
531 .recent_blocks
532 .get(last)
533 .expect("Block should be found in recent blocks")
534 .block
535 .clone();
536 }
537
538 let (_, genesis_block) = self
540 .genesis
541 .iter()
542 .find(|(block_ref, _)| block_ref.author == authority)
543 .expect("Genesis should be found for authority {authority_index}");
544 genesis_block.clone()
545 }
546
547 pub(crate) fn get_cached_blocks(
553 &self,
554 authority: AuthorityIndex,
555 start: Round,
556 ) -> Vec<VerifiedBlock> {
557 self.get_cached_blocks_in_range(authority, start, Round::MAX, usize::MAX)
558 }
559
560 pub(crate) fn get_cached_blocks_in_range(
563 &self,
564 authority: AuthorityIndex,
565 start_round: Round,
566 end_round: Round,
567 limit: usize,
568 ) -> Vec<VerifiedBlock> {
569 if start_round >= end_round || limit == 0 {
570 return vec![];
571 }
572
573 let mut blocks = vec![];
574 for block_ref in self.recent_refs_by_authority[authority].range((
575 Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
576 Excluded(BlockRef::new(
577 end_round,
578 AuthorityIndex::MIN,
579 BlockDigest::MIN,
580 )),
581 )) {
582 let block_info = self
583 .recent_blocks
584 .get(block_ref)
585 .expect("Block should exist in recent blocks");
586 blocks.push(block_info.block.clone());
587 if blocks.len() >= limit {
588 break;
589 }
590 }
591 blocks
592 }
593
594 pub(crate) fn get_last_cached_block_in_range(
596 &self,
597 authority: AuthorityIndex,
598 start_round: Round,
599 end_round: Round,
600 ) -> Option<VerifiedBlock> {
601 if start_round >= end_round {
602 return None;
603 }
604
605 let block_ref = self.recent_refs_by_authority[authority]
606 .range((
607 Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
608 Excluded(BlockRef::new(
609 end_round,
610 AuthorityIndex::MIN,
611 BlockDigest::MIN,
612 )),
613 ))
614 .last()?;
615
616 self.recent_blocks
617 .get(block_ref)
618 .map(|block_info| block_info.block.clone())
619 }
620
621 pub(crate) fn get_last_cached_block_per_authority(
628 &self,
629 end_round: Round,
630 ) -> Vec<(VerifiedBlock, Vec<BlockRef>)> {
631 let mut blocks = self.genesis.values().cloned().collect::<Vec<_>>();
633 let mut equivocating_blocks = vec![vec![]; self.context.committee.size()];
634
635 if end_round == GENESIS_ROUND {
636 panic!(
637 "Attempted to retrieve blocks earlier than the genesis round which is not possible"
638 );
639 }
640
641 if end_round == GENESIS_ROUND + 1 {
642 return blocks.into_iter().map(|b| (b, vec![])).collect();
643 }
644
645 for (authority_index, block_refs) in self.recent_refs_by_authority.iter().enumerate() {
646 let authority_index = self
647 .context
648 .committee
649 .to_authority_index(authority_index)
650 .unwrap();
651
652 let last_evicted_round = self.evicted_rounds[authority_index];
653 if end_round.saturating_sub(1) <= last_evicted_round {
654 panic!(
655 "Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}",
656 );
657 }
658
659 let block_ref_iter = block_refs
660 .range((
661 Included(BlockRef::new(
662 last_evicted_round + 1,
663 authority_index,
664 BlockDigest::MIN,
665 )),
666 Excluded(BlockRef::new(end_round, authority_index, BlockDigest::MIN)),
667 ))
668 .rev();
669
670 let mut last_round = 0;
671 for block_ref in block_ref_iter {
672 if last_round == 0 {
673 last_round = block_ref.round;
674 let block_info = self
675 .recent_blocks
676 .get(block_ref)
677 .expect("Block should exist in recent blocks");
678 blocks[authority_index] = block_info.block.clone();
679 continue;
680 }
681 if block_ref.round < last_round {
682 break;
683 }
684 equivocating_blocks[authority_index].push(*block_ref);
685 }
686 }
687
688 blocks.into_iter().zip(equivocating_blocks).collect()
689 }
690
691 pub(crate) fn contains_cached_block_at_slot(&self, slot: Slot) -> bool {
694 if slot.round == GENESIS_ROUND {
696 return true;
697 }
698
699 let eviction_round = self.evicted_rounds[slot.authority];
700 if slot.round <= eviction_round {
701 panic!(
702 "{}",
703 format!(
704 "Attempted to check for slot {slot} that is <= the last evicted round {eviction_round}"
705 )
706 );
707 }
708
709 let mut result = self.recent_refs_by_authority[slot.authority].range((
710 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
711 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
712 ));
713 result.next().is_some()
714 }
715
716 pub(crate) fn contains_blocks(&self, block_refs: Vec<BlockRef>) -> Vec<bool> {
719 let mut exist = vec![false; block_refs.len()];
720 let mut missing = Vec::new();
721
722 for (index, block_ref) in block_refs.into_iter().enumerate() {
723 let recent_refs = &self.recent_refs_by_authority[block_ref.author];
724 if recent_refs.contains(&block_ref) || self.genesis.contains_key(&block_ref) {
725 exist[index] = true;
726 } else if recent_refs.is_empty() || recent_refs.last().unwrap().round < block_ref.round
727 {
728 exist[index] = false;
732 } else {
733 missing.push((index, block_ref));
734 }
735 }
736
737 if missing.is_empty() {
738 return exist;
739 }
740
741 let missing_refs = missing
742 .iter()
743 .map(|(_, block_ref)| *block_ref)
744 .collect::<Vec<_>>();
745 let store_results = self
746 .store
747 .contains_blocks(&missing_refs)
748 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
749 self.context
750 .metrics
751 .node_metrics
752 .dag_state_store_read_count
753 .with_label_values(&["contains_blocks"])
754 .inc();
755
756 for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
757 exist[index] = result;
758 }
759
760 exist
761 }
762
763 pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool {
764 let blocks = self.contains_blocks(vec![*block_ref]);
765 blocks.first().cloned().unwrap()
766 }
767
768 pub(crate) fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
771 if let Some(block_info) = self.recent_blocks.get_mut(block_ref) {
772 if !block_info.committed {
773 block_info.committed = true;
774 return true;
775 }
776 false
777 } else {
778 panic!(
779 "Block {:?} not found in cache to set as committed.",
780 block_ref
781 );
782 }
783 }
784
785 pub(crate) fn is_committed(&self, block_ref: &BlockRef) -> bool {
787 self.recent_blocks
788 .get(block_ref)
789 .unwrap_or_else(|| panic!("Attempted to query for commit status for a block not in cached data {block_ref}"))
790 .committed
791 }
792
793 pub(crate) fn link_causal_history(&mut self, root_block: BlockRef) -> Vec<BlockRef> {
797 let gc_round = self.gc_round();
798 let mut linked_blocks = vec![];
799 let mut targets = VecDeque::new();
800 targets.push_back(root_block);
801 while let Some(block_ref) = targets.pop_front() {
802 if block_ref.round <= gc_round {
804 continue;
805 }
806 let block_info = self
807 .recent_blocks
808 .get_mut(&block_ref)
809 .unwrap_or_else(|| panic!("Block {:?} is not in DAG state", block_ref));
810 if block_info.hard_linked {
811 continue;
812 }
813 linked_blocks.push(block_ref);
814 block_info.hard_linked = true;
815 targets.extend(block_info.block.ancestors().iter());
816 }
817 linked_blocks
818 }
819
820 pub(crate) fn is_hard_linked(&self, block_ref: &BlockRef) -> bool {
822 self.recent_blocks
823 .get(block_ref)
824 .unwrap_or_else(|| panic!("Attempted to query for hard link status for a block not in cached data {block_ref}"))
825 .hard_linked
826 }
827
828 pub(crate) fn threshold_clock_round(&self) -> Round {
829 self.threshold_clock.get_round()
830 }
831
832 pub(crate) fn threshold_clock_quorum_ts(&self) -> Instant {
834 self.threshold_clock.get_quorum_ts()
835 }
836
837 pub(crate) fn highest_accepted_round(&self) -> Round {
838 self.highest_accepted_round
839 }
840
841 pub(crate) fn add_commit(&mut self, commit: TrustedCommit) {
844 let time_diff = if let Some(last_commit) = &self.last_commit {
845 if commit.index() <= last_commit.index() {
846 error!(
847 "New commit index {} <= last commit index {}!",
848 commit.index(),
849 last_commit.index()
850 );
851 return;
852 }
853 assert_eq!(commit.index(), last_commit.index() + 1);
854
855 if commit.timestamp_ms() < last_commit.timestamp_ms() {
856 panic!(
857 "Commit timestamps do not monotonically increment, prev commit {:?}, new commit {:?}",
858 last_commit, commit
859 );
860 }
861 commit
862 .timestamp_ms()
863 .saturating_sub(last_commit.timestamp_ms())
864 } else {
865 assert_eq!(commit.index(), 1);
866 0
867 };
868
869 self.context
870 .metrics
871 .node_metrics
872 .last_commit_time_diff
873 .observe(time_diff as f64);
874
875 let commit_round_advanced = if let Some(previous_commit) = &self.last_commit {
876 previous_commit.round() < commit.round()
877 } else {
878 true
879 };
880
881 self.last_commit = Some(commit.clone());
882
883 if commit_round_advanced {
884 let now = std::time::Instant::now();
885 if let Some(previous_time) = self.last_commit_round_advancement_time {
886 self.context
887 .metrics
888 .node_metrics
889 .commit_round_advancement_interval
890 .observe(now.duration_since(previous_time).as_secs_f64())
891 }
892 self.last_commit_round_advancement_time = Some(now);
893 }
894
895 for block_ref in commit.blocks().iter() {
896 self.last_committed_rounds[block_ref.author] = max(
897 self.last_committed_rounds[block_ref.author],
898 block_ref.round,
899 );
900 }
901
902 for (i, round) in self.last_committed_rounds.iter().enumerate() {
903 let index = self.context.committee.to_authority_index(i).unwrap();
904 let hostname = &self.context.committee.authority(index).hostname;
905 self.context
906 .metrics
907 .node_metrics
908 .last_committed_authority_round
909 .with_label_values(&[hostname])
910 .set((*round).into());
911 }
912
913 self.pending_commit_votes.push_back(commit.reference());
914 self.commits_to_write.push(commit);
915 }
916
917 pub(crate) fn recover_commits_to_write(&mut self, commits: Vec<TrustedCommit>) {
919 self.commits_to_write.extend(commits);
920 }
921
922 pub(crate) fn ensure_commits_to_write_is_empty(&self) {
923 assert!(
924 self.commits_to_write.is_empty(),
925 "Commits to write should be empty. {:?}",
926 self.commits_to_write,
927 );
928 }
929
930 pub(crate) fn add_commit_info(&mut self, reputation_scores: ReputationScores) {
931 assert!(self.scoring_subdag.is_empty());
935
936 let commit_info = CommitInfo {
937 committed_rounds: self.last_committed_rounds.clone(),
938 reputation_scores,
939 };
940 let last_commit = self
941 .last_commit
942 .as_ref()
943 .expect("Last commit should already be set.");
944 self.commit_info_to_write
945 .push((last_commit.reference(), commit_info));
946 }
947
948 pub(crate) fn add_finalized_commit(
949 &mut self,
950 commit_ref: CommitRef,
951 rejected_transactions: BTreeMap<BlockRef, Vec<TransactionIndex>>,
952 ) {
953 self.finalized_commits_to_write
954 .push((commit_ref, rejected_transactions));
955 }
956
957 pub(crate) fn take_commit_votes(&mut self, limit: usize) -> Vec<CommitVote> {
958 let mut votes = Vec::new();
959 while !self.pending_commit_votes.is_empty() && votes.len() < limit {
960 votes.push(self.pending_commit_votes.pop_front().unwrap());
961 }
962 votes
963 }
964
965 pub(crate) fn last_commit_index(&self) -> CommitIndex {
967 match &self.last_commit {
968 Some(commit) => commit.index(),
969 None => 0,
970 }
971 }
972
973 pub(crate) fn last_commit_digest(&self) -> CommitDigest {
975 match &self.last_commit {
976 Some(commit) => commit.digest(),
977 None => CommitDigest::MIN,
978 }
979 }
980
981 pub(crate) fn last_commit_timestamp_ms(&self) -> BlockTimestampMs {
983 match &self.last_commit {
984 Some(commit) => commit.timestamp_ms(),
985 None => 0,
986 }
987 }
988
989 pub(crate) fn last_commit_leader(&self) -> Slot {
991 match &self.last_commit {
992 Some(commit) => commit.leader().into(),
993 None => self
994 .genesis
995 .iter()
996 .next()
997 .map(|(genesis_ref, _)| *genesis_ref)
998 .expect("Genesis blocks should always be available.")
999 .into(),
1000 }
1001 }
1002
1003 pub(crate) fn last_commit_round(&self) -> Round {
1005 match &self.last_commit {
1006 Some(commit) => commit.leader().round,
1007 None => 0,
1008 }
1009 }
1010
1011 pub(crate) fn last_committed_rounds(&self) -> Vec<Round> {
1013 self.last_committed_rounds.clone()
1014 }
1015
1016 pub(crate) fn gc_round(&self) -> Round {
1020 self.calculate_gc_round(self.last_commit_round())
1021 }
1022
1023 pub(crate) fn calculate_gc_round(&self, commit_round: Round) -> Round {
1026 commit_round.saturating_sub(self.context.protocol_config.gc_depth())
1027 }
1028
1029 pub(crate) fn flush(&mut self) {
1039 let _s = self
1040 .context
1041 .metrics
1042 .node_metrics
1043 .scope_processing_time
1044 .with_label_values(&["DagState::flush"])
1045 .start_timer();
1046
1047 let pending_blocks = std::mem::take(&mut self.blocks_to_write);
1049 let pending_commits = std::mem::take(&mut self.commits_to_write);
1050 let pending_commit_info = std::mem::take(&mut self.commit_info_to_write);
1051 let pending_finalized_commits = std::mem::take(&mut self.finalized_commits_to_write);
1052 if pending_blocks.is_empty()
1053 && pending_commits.is_empty()
1054 && pending_commit_info.is_empty()
1055 && pending_finalized_commits.is_empty()
1056 {
1057 return;
1058 }
1059
1060 debug!(
1061 "Flushing {} blocks ({}), {} commits ({}), {} commit infos ({}), {} finalized commits ({}) to storage.",
1062 pending_blocks.len(),
1063 pending_blocks
1064 .iter()
1065 .map(|b| b.reference().to_string())
1066 .join(","),
1067 pending_commits.len(),
1068 pending_commits
1069 .iter()
1070 .map(|c| c.reference().to_string())
1071 .join(","),
1072 pending_commit_info.len(),
1073 pending_commit_info
1074 .iter()
1075 .map(|(commit_ref, _)| commit_ref.to_string())
1076 .join(","),
1077 pending_finalized_commits.len(),
1078 pending_finalized_commits
1079 .iter()
1080 .map(|(commit_ref, _)| commit_ref.to_string())
1081 .join(","),
1082 );
1083 self.store
1084 .write(WriteBatch::new(
1085 pending_blocks,
1086 pending_commits,
1087 pending_commit_info,
1088 pending_finalized_commits,
1089 ))
1090 .unwrap_or_else(|e| panic!("Failed to write to storage: {:?}", e));
1091 self.context
1092 .metrics
1093 .node_metrics
1094 .dag_state_store_write_count
1095 .inc();
1096
1097 for (authority_index, _) in self.context.committee.authorities() {
1099 let eviction_round = self.calculate_authority_eviction_round(authority_index);
1100 while let Some(block_ref) = self.recent_refs_by_authority[authority_index].first() {
1101 if block_ref.round <= eviction_round {
1102 self.recent_blocks.remove(block_ref);
1103 self.recent_refs_by_authority[authority_index].pop_first();
1104 } else {
1105 break;
1106 }
1107 }
1108 self.evicted_rounds[authority_index] = eviction_round;
1109 }
1110
1111 let metrics = &self.context.metrics.node_metrics;
1112 metrics
1113 .dag_state_recent_blocks
1114 .set(self.recent_blocks.len() as i64);
1115 metrics.dag_state_recent_refs.set(
1116 self.recent_refs_by_authority
1117 .iter()
1118 .map(BTreeSet::len)
1119 .sum::<usize>() as i64,
1120 );
1121 }
1122
1123 pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> {
1124 self.store
1125 .read_last_commit_info()
1126 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
1127 }
1128
1129 pub(crate) fn add_scoring_subdags(&mut self, scoring_subdags: Vec<CommittedSubDag>) {
1130 self.scoring_subdag.add_subdags(scoring_subdags);
1131 }
1132
1133 pub(crate) fn clear_scoring_subdag(&mut self) {
1134 self.scoring_subdag.clear();
1135 }
1136
1137 pub(crate) fn scoring_subdags_count(&self) -> usize {
1138 self.scoring_subdag.scored_subdags_count()
1139 }
1140
1141 pub(crate) fn is_scoring_subdag_empty(&self) -> bool {
1142 self.scoring_subdag.is_empty()
1143 }
1144
1145 pub(crate) fn calculate_scoring_subdag_scores(&self) -> ReputationScores {
1146 self.scoring_subdag.calculate_distributed_vote_scores()
1147 }
1148
1149 pub(crate) fn scoring_subdag_commit_range(&self) -> CommitIndex {
1150 self.scoring_subdag
1151 .commit_range
1152 .as_ref()
1153 .expect("commit range should exist for scoring subdag")
1154 .end()
1155 }
1156
1157 fn calculate_authority_eviction_round(&self, authority_index: AuthorityIndex) -> Round {
1161 let last_round = self.recent_refs_by_authority[authority_index]
1162 .last()
1163 .map(|block_ref| block_ref.round)
1164 .unwrap_or(GENESIS_ROUND);
1165
1166 Self::eviction_round(last_round, self.gc_round(), self.cached_rounds)
1167 }
1168
1169 fn eviction_round(last_round: Round, gc_round: Round, cached_rounds: u32) -> Round {
1172 gc_round.min(last_round.saturating_sub(cached_rounds))
1173 }
1174
1175 pub(crate) fn store(&self) -> Arc<dyn Store> {
1177 self.store.clone()
1178 }
1179
1180 #[cfg(test)]
1183 pub(crate) fn last_quorum(&self) -> Vec<VerifiedBlock> {
1184 for round in
1187 (self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev()
1188 {
1189 if round == GENESIS_ROUND {
1190 return self.genesis_blocks();
1191 }
1192 use crate::stake_aggregator::{QuorumThreshold, StakeAggregator};
1193 let mut quorum = StakeAggregator::<QuorumThreshold>::new();
1194
1195 let blocks = self.get_uncommitted_blocks_at_round(round);
1197 for block in &blocks {
1198 if quorum.add(block.author(), &self.context.committee) {
1199 return blocks;
1200 }
1201 }
1202 }
1203
1204 panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds.");
1205 }
1206
1207 #[cfg(test)]
1208 pub(crate) fn genesis_blocks(&self) -> Vec<VerifiedBlock> {
1209 self.genesis.values().cloned().collect()
1210 }
1211
1212 #[cfg(test)]
1213 pub(crate) fn set_last_commit(&mut self, commit: TrustedCommit) {
1214 self.last_commit = Some(commit);
1215 }
1216}
1217
1218struct BlockInfo {
1219 block: VerifiedBlock,
1220 committed: bool,
1222 hard_linked: bool,
1224}
1225
1226impl BlockInfo {
1227 fn new(block: VerifiedBlock) -> Self {
1228 Self {
1229 block,
1230 committed: false,
1231 hard_linked: false,
1232 }
1233 }
1234}
1235
1236#[cfg(test)]
1237mod test {
1238 use std::vec;
1239
1240 use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs};
1241 use parking_lot::RwLock;
1242
1243 use super::*;
1244 use crate::{
1245 block::{TestBlock, VerifiedBlock},
1246 storage::{WriteBatch, mem_store::MemStore},
1247 test_dag_builder::DagBuilder,
1248 test_dag_parser::parse_dag,
1249 };
1250
1251 #[tokio::test]
1252 async fn test_get_blocks() {
1253 let (context, _) = Context::new_for_test(4);
1254 let context = Arc::new(context);
1255 let store = Arc::new(MemStore::new());
1256 let mut dag_state = DagState::new(context.clone(), store.clone());
1257 let own_index = AuthorityIndex::new_for_test(0);
1258
1259 let num_rounds: u32 = 10;
1261 let non_existent_round: u32 = 100;
1262 let num_authorities: u32 = 3;
1263 let num_blocks_per_slot: usize = 3;
1264 let mut blocks = BTreeMap::new();
1265 for round in 1..=num_rounds {
1266 for author in 0..num_authorities {
1267 let base_ts = round as BlockTimestampMs * 1000;
1269 for timestamp in base_ts..base_ts + num_blocks_per_slot as u64 {
1270 let block = VerifiedBlock::new_for_test(
1271 TestBlock::new(round, author)
1272 .set_timestamp_ms(timestamp)
1273 .build(),
1274 );
1275 dag_state.accept_block(block.clone());
1276 blocks.insert(block.reference(), block);
1277
1278 if AuthorityIndex::new_for_test(author) == own_index {
1280 break;
1281 }
1282 }
1283 }
1284 }
1285
1286 for (r, block) in &blocks {
1288 assert_eq!(&dag_state.get_block(r).unwrap(), block);
1289 }
1290
1291 let last_ref = blocks.keys().last().unwrap();
1293 assert!(
1294 dag_state
1295 .get_block(&BlockRef::new(
1296 last_ref.round,
1297 last_ref.author,
1298 BlockDigest::MIN
1299 ))
1300 .is_none()
1301 );
1302
1303 for round in 1..=num_rounds {
1305 for author in 0..num_authorities {
1306 let slot = Slot::new(
1307 round,
1308 context
1309 .committee
1310 .to_authority_index(author as usize)
1311 .unwrap(),
1312 );
1313 let blocks = dag_state.get_uncommitted_blocks_at_slot(slot);
1314
1315 if AuthorityIndex::new_for_test(author) == own_index {
1317 assert_eq!(blocks.len(), 1);
1318 } else {
1319 assert_eq!(blocks.len(), num_blocks_per_slot);
1320 }
1321
1322 for b in blocks {
1323 assert_eq!(b.round(), round);
1324 assert_eq!(
1325 b.author(),
1326 context
1327 .committee
1328 .to_authority_index(author as usize)
1329 .unwrap()
1330 );
1331 }
1332 }
1333 }
1334
1335 let slot = Slot::new(non_existent_round, AuthorityIndex::ZERO);
1337 assert!(dag_state.get_uncommitted_blocks_at_slot(slot).is_empty());
1338
1339 for round in 1..=num_rounds {
1341 let blocks = dag_state.get_uncommitted_blocks_at_round(round);
1342 assert_eq!(
1345 blocks.len(),
1346 (num_authorities - 1) as usize * num_blocks_per_slot + 1
1347 );
1348 for b in blocks {
1349 assert_eq!(b.round(), round);
1350 }
1351 }
1352
1353 assert!(
1355 dag_state
1356 .get_uncommitted_blocks_at_round(non_existent_round)
1357 .is_empty()
1358 );
1359 }
1360
1361 #[tokio::test]
1362 async fn test_ancestors_at_uncommitted_round() {
1363 let (context, _) = Context::new_for_test(4);
1365 let context = Arc::new(context);
1366 let store = Arc::new(MemStore::new());
1367 let mut dag_state = DagState::new(context.clone(), store.clone());
1368
1369 let round_10_refs: Vec<_> = (0..4)
1373 .map(|a| {
1374 VerifiedBlock::new_for_test(TestBlock::new(10, a).set_timestamp_ms(1000).build())
1375 .reference()
1376 })
1377 .collect();
1378
1379 let round_11 = vec![
1381 VerifiedBlock::new_for_test(
1383 TestBlock::new(11, 0)
1384 .set_timestamp_ms(1100)
1385 .set_ancestors(round_10_refs.clone())
1386 .build(),
1387 ),
1388 VerifiedBlock::new_for_test(
1391 TestBlock::new(11, 1)
1392 .set_timestamp_ms(1110)
1393 .set_ancestors(round_10_refs.clone())
1394 .build(),
1395 ),
1396 VerifiedBlock::new_for_test(
1398 TestBlock::new(11, 1)
1399 .set_timestamp_ms(1111)
1400 .set_ancestors(round_10_refs.clone())
1401 .build(),
1402 ),
1403 VerifiedBlock::new_for_test(
1405 TestBlock::new(11, 1)
1406 .set_timestamp_ms(1112)
1407 .set_ancestors(round_10_refs.clone())
1408 .build(),
1409 ),
1410 VerifiedBlock::new_for_test(
1412 TestBlock::new(11, 2)
1413 .set_timestamp_ms(1120)
1414 .set_ancestors(round_10_refs.clone())
1415 .build(),
1416 ),
1417 VerifiedBlock::new_for_test(
1419 TestBlock::new(11, 3)
1420 .set_timestamp_ms(1130)
1421 .set_ancestors(round_10_refs.clone())
1422 .build(),
1423 ),
1424 ];
1425
1426 let ancestors_for_round_12 = vec![
1428 round_11[0].reference(),
1429 round_11[1].reference(),
1430 round_11[5].reference(),
1431 ];
1432 let round_12 = vec![
1433 VerifiedBlock::new_for_test(
1434 TestBlock::new(12, 0)
1435 .set_timestamp_ms(1200)
1436 .set_ancestors(ancestors_for_round_12.clone())
1437 .build(),
1438 ),
1439 VerifiedBlock::new_for_test(
1440 TestBlock::new(12, 2)
1441 .set_timestamp_ms(1220)
1442 .set_ancestors(ancestors_for_round_12.clone())
1443 .build(),
1444 ),
1445 VerifiedBlock::new_for_test(
1446 TestBlock::new(12, 3)
1447 .set_timestamp_ms(1230)
1448 .set_ancestors(ancestors_for_round_12.clone())
1449 .build(),
1450 ),
1451 ];
1452
1453 let ancestors_for_round_13 = vec![
1455 round_12[0].reference(),
1456 round_12[1].reference(),
1457 round_12[2].reference(),
1458 round_11[2].reference(),
1459 ];
1460 let round_13 = vec![
1461 VerifiedBlock::new_for_test(
1462 TestBlock::new(12, 1)
1463 .set_timestamp_ms(1300)
1464 .set_ancestors(ancestors_for_round_13.clone())
1465 .build(),
1466 ),
1467 VerifiedBlock::new_for_test(
1468 TestBlock::new(12, 2)
1469 .set_timestamp_ms(1320)
1470 .set_ancestors(ancestors_for_round_13.clone())
1471 .build(),
1472 ),
1473 VerifiedBlock::new_for_test(
1474 TestBlock::new(12, 3)
1475 .set_timestamp_ms(1330)
1476 .set_ancestors(ancestors_for_round_13.clone())
1477 .build(),
1478 ),
1479 ];
1480
1481 let ancestors_for_round_14 = round_13.iter().map(|b| b.reference()).collect();
1483 let anchor = VerifiedBlock::new_for_test(
1484 TestBlock::new(14, 1)
1485 .set_timestamp_ms(1410)
1486 .set_ancestors(ancestors_for_round_14)
1487 .build(),
1488 );
1489
1490 for b in round_11
1492 .iter()
1493 .chain(round_12.iter())
1494 .chain(round_13.iter())
1495 .chain([anchor.clone()].iter())
1496 {
1497 dag_state.accept_block(b.clone());
1498 }
1499
1500 let ancestors = dag_state.ancestors_at_round(&anchor, 11);
1502 let mut ancestors_refs: Vec<BlockRef> = ancestors.iter().map(|b| b.reference()).collect();
1503 ancestors_refs.sort();
1504 let mut expected_refs = vec![
1505 round_11[0].reference(),
1506 round_11[1].reference(),
1507 round_11[2].reference(),
1508 round_11[5].reference(),
1509 ];
1510 expected_refs.sort(); assert_eq!(
1512 ancestors_refs, expected_refs,
1513 "Expected round 11 ancestors: {:?}. Got: {:?}",
1514 expected_refs, ancestors_refs
1515 );
1516 }
1517
1518 #[tokio::test]
1519 async fn test_link_causal_history() {
1520 let (mut context, _) = Context::new_for_test(4);
1521 context.parameters.dag_state_cached_rounds = 10;
1522 context
1523 .protocol_config
1524 .set_consensus_gc_depth_for_testing(3);
1525 let context = Arc::new(context);
1526
1527 let store = Arc::new(MemStore::new());
1528 let mut dag_state = DagState::new(context.clone(), store.clone());
1529
1530 let mut dag_builder = DagBuilder::new(context.clone());
1532 dag_builder.layers(1..=3).build();
1533 dag_builder
1534 .layers(4..=6)
1535 .authorities(vec![AuthorityIndex::new_for_test(0)])
1536 .skip_block()
1537 .build();
1538
1539 let all_blocks = dag_builder.all_blocks();
1541 dag_state.accept_blocks(all_blocks.clone());
1542
1543 for block in &all_blocks {
1545 assert!(!dag_state.is_hard_linked(&block.reference()));
1546 }
1547
1548 let round_1_block = &all_blocks[1];
1550 assert_eq!(round_1_block.round(), 1);
1551 let linked_blocks = dag_state.link_causal_history(round_1_block.reference());
1552
1553 assert_eq!(linked_blocks.len(), 1);
1555 assert_eq!(linked_blocks[0], round_1_block.reference());
1556 for block_ref in linked_blocks {
1557 assert!(dag_state.is_hard_linked(&block_ref));
1558 }
1559
1560 let round_2_block = &all_blocks[4];
1562 assert_eq!(round_2_block.round(), 2);
1563 let linked_blocks = dag_state.link_causal_history(round_2_block.reference());
1564
1565 assert_eq!(linked_blocks.len(), 4);
1567 for block_ref in linked_blocks {
1568 assert!(block_ref == round_2_block.reference() || block_ref.round == 1);
1569 }
1570
1571 for block in &all_blocks {
1573 if block.round() == 1 || block.reference() == round_2_block.reference() {
1574 assert!(dag_state.is_hard_linked(&block.reference()));
1575 } else {
1576 assert!(!dag_state.is_hard_linked(&block.reference()));
1577 }
1578 }
1579
1580 let round_6_block = all_blocks.last().unwrap();
1582 assert_eq!(round_6_block.round(), 6);
1583
1584 let last_commit = TrustedCommit::new_for_test(
1586 6,
1587 CommitDigest::MIN,
1588 context.clock.timestamp_utc_ms(),
1589 round_6_block.reference(),
1590 vec![],
1591 );
1592 dag_state.set_last_commit(last_commit);
1593 assert_eq!(
1594 dag_state.gc_round(),
1595 3,
1596 "GC round should have moved to round 3"
1597 );
1598
1599 let linked_blocks = dag_state.link_causal_history(round_6_block.reference());
1601
1602 assert_eq!(linked_blocks.len(), 7, "Linked blocks: {:?}", linked_blocks);
1604 for block_ref in linked_blocks {
1605 assert!(
1606 block_ref.round == 4
1607 || block_ref.round == 5
1608 || block_ref == round_6_block.reference()
1609 );
1610 }
1611
1612 for block in &all_blocks {
1614 let block_ref = block.reference();
1615 if block.round() == 1
1616 || block_ref == round_2_block.reference()
1617 || block_ref.round == 4
1618 || block_ref.round == 5
1619 || block_ref == round_6_block.reference()
1620 {
1621 assert!(dag_state.is_hard_linked(&block.reference()));
1622 } else {
1623 assert!(!dag_state.is_hard_linked(&block.reference()));
1624 }
1625 }
1626 }
1627
1628 #[tokio::test]
1629 async fn test_contains_blocks_in_cache_or_store() {
1630 const CACHED_ROUNDS: Round = 2;
1632
1633 let (mut context, _) = Context::new_for_test(4);
1634 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1635
1636 let context = Arc::new(context);
1637 let store = Arc::new(MemStore::new());
1638 let mut dag_state = DagState::new(context.clone(), store.clone());
1639
1640 let num_rounds: u32 = 10;
1642 let num_authorities: u32 = 4;
1643 let mut blocks = Vec::new();
1644
1645 for round in 1..=num_rounds {
1646 for author in 0..num_authorities {
1647 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1648 blocks.push(block);
1649 }
1650 }
1651
1652 blocks.clone().into_iter().for_each(|block| {
1654 if block.round() <= 4 {
1655 store
1656 .write(WriteBatch::default().blocks(vec![block]))
1657 .unwrap();
1658 } else {
1659 dag_state.accept_blocks(vec![block]);
1660 }
1661 });
1662
1663 let mut block_refs = blocks
1666 .iter()
1667 .map(|block| block.reference())
1668 .collect::<Vec<_>>();
1669 let result = dag_state.contains_blocks(block_refs.clone());
1670
1671 let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1673 assert_eq!(result, expected);
1674
1675 block_refs.insert(
1677 3,
1678 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1679 );
1680 let result = dag_state.contains_blocks(block_refs.clone());
1681
1682 expected.insert(3, false);
1684 assert_eq!(result, expected.clone());
1685 }
1686
1687 #[tokio::test]
1688 async fn test_contains_cached_block_at_slot() {
1689 const CACHED_ROUNDS: Round = 2;
1691
1692 let num_authorities: u32 = 4;
1693 let (mut context, _) = Context::new_for_test(num_authorities as usize);
1694 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1695
1696 let context = Arc::new(context);
1697 let store = Arc::new(MemStore::new());
1698 let mut dag_state = DagState::new(context.clone(), store.clone());
1699
1700 let num_rounds: u32 = 10;
1702 let mut blocks = Vec::new();
1703
1704 for round in 1..=num_rounds {
1705 for author in 0..num_authorities {
1706 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1707 blocks.push(block.clone());
1708 dag_state.accept_block(block);
1709 }
1710 }
1711
1712 for (author, _) in context.committee.authorities() {
1714 assert!(
1715 dag_state.contains_cached_block_at_slot(Slot::new(GENESIS_ROUND, author)),
1716 "Genesis should always be found"
1717 );
1718 }
1719
1720 let mut block_refs = blocks
1723 .iter()
1724 .map(|block| block.reference())
1725 .collect::<Vec<_>>();
1726
1727 for block_ref in block_refs.clone() {
1728 let slot = block_ref.into();
1729 let found = dag_state.contains_cached_block_at_slot(slot);
1730 assert!(found, "A block should be found at slot {}", slot);
1731 }
1732
1733 block_refs.insert(
1736 3,
1737 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1738 );
1739 let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1740 expected.insert(3, false);
1741
1742 for block_ref in block_refs {
1744 let slot = block_ref.into();
1745 let found = dag_state.contains_cached_block_at_slot(slot);
1746
1747 assert_eq!(expected.remove(0), found);
1748 }
1749 }
1750
1751 #[tokio::test]
1752 #[ignore]
1753 #[should_panic(
1754 expected = "Attempted to check for slot [1]3 that is <= the last gc evicted round 3"
1755 )]
1756 async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range() {
1757 const GC_DEPTH: u32 = 2;
1760 const CACHED_ROUNDS: Round = 3;
1762
1763 let (mut context, _) = Context::new_for_test(4);
1764 context
1765 .protocol_config
1766 .set_consensus_gc_depth_for_testing(GC_DEPTH);
1767 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1768
1769 let context = Arc::new(context);
1770 let store = Arc::new(MemStore::new());
1771 let mut dag_state = DagState::new(context.clone(), store.clone());
1772
1773 let mut dag_builder = DagBuilder::new(context.clone());
1775 dag_builder.layers(1..=3).build();
1776 dag_builder
1777 .layers(4..=6)
1778 .authorities(vec![AuthorityIndex::new_for_test(0)])
1779 .skip_block()
1780 .build();
1781
1782 dag_builder
1784 .all_blocks()
1785 .into_iter()
1786 .for_each(|block| dag_state.accept_block(block));
1787
1788 dag_state.add_commit(TrustedCommit::new_for_test(
1790 1 as CommitIndex,
1791 CommitDigest::MIN,
1792 0,
1793 dag_builder.leader_block(5).unwrap().reference(),
1794 vec![],
1795 ));
1796 dag_state.flush();
1798
1799 assert_eq!(dag_state.gc_round(), 3, "GC round should be 3");
1801
1802 for authority_index in 1..=3 {
1806 for round in 4..=6 {
1807 assert!(dag_state.contains_cached_block_at_slot(Slot::new(
1808 round,
1809 AuthorityIndex::new_for_test(authority_index)
1810 )));
1811 }
1812 }
1813
1814 for round in 1..=3 {
1815 assert!(
1816 dag_state.contains_cached_block_at_slot(Slot::new(
1817 round,
1818 AuthorityIndex::new_for_test(0)
1819 ))
1820 );
1821 }
1822
1823 let _ =
1826 dag_state.contains_cached_block_at_slot(Slot::new(3, AuthorityIndex::new_for_test(1)));
1827 }
1828
1829 #[tokio::test]
1830 async fn test_get_blocks_in_cache_or_store() {
1831 let (context, _) = Context::new_for_test(4);
1832 let context = Arc::new(context);
1833 let store = Arc::new(MemStore::new());
1834 let mut dag_state = DagState::new(context.clone(), store.clone());
1835
1836 let num_rounds: u32 = 10;
1838 let num_authorities: u32 = 4;
1839 let mut blocks = Vec::new();
1840
1841 for round in 1..=num_rounds {
1842 for author in 0..num_authorities {
1843 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1844 blocks.push(block);
1845 }
1846 }
1847
1848 blocks.clone().into_iter().for_each(|block| {
1850 if block.round() <= 4 {
1851 store
1852 .write(WriteBatch::default().blocks(vec![block]))
1853 .unwrap();
1854 } else {
1855 dag_state.accept_blocks(vec![block]);
1856 }
1857 });
1858
1859 let mut block_refs = blocks
1862 .iter()
1863 .map(|block| block.reference())
1864 .collect::<Vec<_>>();
1865 let result = dag_state.get_blocks(&block_refs);
1866
1867 let mut expected = blocks
1868 .into_iter()
1869 .map(Some)
1870 .collect::<Vec<Option<VerifiedBlock>>>();
1871
1872 assert_eq!(result, expected.clone());
1874
1875 block_refs.insert(
1877 3,
1878 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1879 );
1880 let result = dag_state.get_blocks(&block_refs);
1881
1882 expected.insert(3, None);
1884 assert_eq!(result, expected);
1885 }
1886
1887 #[tokio::test]
1888 async fn test_flush_and_recovery() {
1889 telemetry_subscribers::init_for_testing();
1890
1891 const GC_DEPTH: u32 = 3;
1892 const CACHED_ROUNDS: u32 = 4;
1893
1894 let num_authorities: u32 = 4;
1895 let (mut context, _) = Context::new_for_test(num_authorities as usize);
1896 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1897 context
1898 .protocol_config
1899 .set_consensus_gc_depth_for_testing(GC_DEPTH);
1900
1901 let context = Arc::new(context);
1902
1903 let store = Arc::new(MemStore::new());
1904 let mut dag_state = DagState::new(context.clone(), store.clone());
1905
1906 const NUM_ROUNDS: Round = 20;
1907 let mut dag_builder = DagBuilder::new(context.clone());
1908 dag_builder.layers(1..=5).build();
1909 dag_builder
1910 .layers(6..=8)
1911 .authorities(vec![AuthorityIndex::new_for_test(0)])
1912 .skip_block()
1913 .build();
1914 dag_builder.layers(9..=NUM_ROUNDS).build();
1915
1916 const LAST_COMMIT_ROUND: Round = 16;
1918 const LAST_COMMIT_INDEX: CommitIndex = 15;
1919 let commits = dag_builder
1920 .get_sub_dag_and_commits(1..=NUM_ROUNDS)
1921 .into_iter()
1922 .map(|(_subdag, commit)| commit)
1923 .take(LAST_COMMIT_INDEX as usize)
1924 .collect::<Vec<_>>();
1925 assert_eq!(commits.len(), LAST_COMMIT_INDEX as usize);
1926 assert_eq!(commits.last().unwrap().round(), LAST_COMMIT_ROUND);
1927
1928 const PERSISTED_BLOCK_ROUNDS: u32 = 12;
1932 const NUM_PERSISTED_COMMITS: usize = 8;
1933 const LAST_PERSISTED_COMMIT_ROUND: Round = 9;
1934 const LAST_PERSISTED_COMMIT_INDEX: CommitIndex = 8;
1935 dag_state.accept_blocks(dag_builder.blocks(1..=PERSISTED_BLOCK_ROUNDS));
1936 let mut finalized_commits = vec![];
1937 for commit in commits.iter().take(NUM_PERSISTED_COMMITS).cloned() {
1938 finalized_commits.push(commit.clone());
1939 dag_state.add_commit(commit);
1940 }
1941 let last_finalized_commit = finalized_commits.last().unwrap();
1942 assert_eq!(last_finalized_commit.round(), LAST_PERSISTED_COMMIT_ROUND);
1943 assert_eq!(last_finalized_commit.index(), LAST_PERSISTED_COMMIT_INDEX);
1944
1945 let finalized_blocks = finalized_commits
1947 .iter()
1948 .flat_map(|commit| commit.blocks())
1949 .collect::<BTreeSet<_>>();
1950
1951 dag_state.flush();
1953
1954 let store_blocks = store
1956 .scan_blocks_by_author(AuthorityIndex::new_for_test(1), 1)
1957 .unwrap();
1958 assert_eq!(store_blocks.last().unwrap().round(), PERSISTED_BLOCK_ROUNDS);
1959 let store_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1960 assert_eq!(store_commits.len(), NUM_PERSISTED_COMMITS);
1961 assert_eq!(
1962 store_commits.last().unwrap().index(),
1963 LAST_PERSISTED_COMMIT_INDEX
1964 );
1965 assert_eq!(
1966 store_commits.last().unwrap().round(),
1967 LAST_PERSISTED_COMMIT_ROUND
1968 );
1969
1970 dag_state.accept_blocks(dag_builder.blocks(PERSISTED_BLOCK_ROUNDS + 1..=NUM_ROUNDS));
1972 for commit in commits.iter().skip(NUM_PERSISTED_COMMITS).cloned() {
1973 dag_state.add_commit(commit);
1974 }
1975
1976 let all_blocks = dag_builder.blocks(1..=NUM_ROUNDS);
1978 let block_refs = all_blocks
1979 .iter()
1980 .map(|block| block.reference())
1981 .collect::<Vec<_>>();
1982 let result = dag_state
1983 .get_blocks(&block_refs)
1984 .into_iter()
1985 .map(|b| b.unwrap())
1986 .collect::<Vec<_>>();
1987 assert_eq!(result, all_blocks);
1988
1989 assert_eq!(dag_state.last_commit_index(), LAST_COMMIT_INDEX);
1991
1992 drop(dag_state);
1994
1995 let dag_state = DagState::new(context.clone(), store.clone());
1997
1998 let all_blocks = dag_builder.blocks(1..=PERSISTED_BLOCK_ROUNDS);
2000 let block_refs = all_blocks
2001 .iter()
2002 .map(|block| block.reference())
2003 .collect::<Vec<_>>();
2004 let result = dag_state
2005 .get_blocks(&block_refs)
2006 .into_iter()
2007 .map(|b| b.unwrap())
2008 .collect::<Vec<_>>();
2009 assert_eq!(result, all_blocks);
2010
2011 let missing_blocks = dag_builder.blocks(PERSISTED_BLOCK_ROUNDS + 1..=NUM_ROUNDS);
2013 let block_refs = missing_blocks
2014 .iter()
2015 .map(|block| block.reference())
2016 .collect::<Vec<_>>();
2017 let retrieved_blocks = dag_state
2018 .get_blocks(&block_refs)
2019 .into_iter()
2020 .flatten()
2021 .collect::<Vec<_>>();
2022 assert!(retrieved_blocks.is_empty());
2023
2024 assert_eq!(dag_state.last_commit_index(), LAST_PERSISTED_COMMIT_INDEX);
2026 assert_eq!(dag_state.last_commit_round(), LAST_PERSISTED_COMMIT_ROUND);
2027
2028 let expected_last_committed_rounds = vec![5, 9, 8, 8];
2030 assert_eq!(
2031 dag_state.last_committed_rounds(),
2032 expected_last_committed_rounds
2033 );
2034 assert_eq!(dag_state.scoring_subdags_count(), NUM_PERSISTED_COMMITS);
2036
2037 for (authority_index, _) in context.committee.authorities() {
2039 let blocks = dag_state.get_cached_blocks(authority_index, 1);
2040
2041 if authority_index == AuthorityIndex::new_for_test(0) {
2045 assert_eq!(blocks.len(), 4);
2046 assert_eq!(dag_state.evicted_rounds[authority_index.value()], 6);
2047 assert!(
2048 blocks
2049 .into_iter()
2050 .all(|block| block.round() >= 7 && block.round() <= 12)
2051 );
2052 } else {
2053 assert_eq!(blocks.len(), 6);
2054 assert_eq!(dag_state.evicted_rounds[authority_index.value()], 6);
2055 assert!(
2056 blocks
2057 .into_iter()
2058 .all(|block| block.round() >= 7 && block.round() <= 12)
2059 );
2060 }
2061 }
2062
2063 let gc_round = dag_state.gc_round();
2065 assert_eq!(gc_round, 6);
2066 dag_state
2067 .recent_blocks
2068 .iter()
2069 .for_each(|(block_ref, block_info)| {
2070 if block_ref.round > gc_round && finalized_blocks.contains(block_ref) {
2071 assert!(
2072 block_info.committed,
2073 "Block {:?} should be set as committed",
2074 block_ref
2075 );
2076 }
2077 });
2078
2079 dag_state
2084 .recent_blocks
2085 .iter()
2086 .for_each(|(block_ref, block_info)| {
2087 if block_ref.round < PERSISTED_BLOCK_ROUNDS || block_ref.author.value() == 0 {
2088 assert!(block_info.hard_linked);
2089 } else {
2090 assert!(!block_info.hard_linked);
2091 }
2092 });
2093 }
2094
2095 #[tokio::test]
2096 async fn test_block_info_as_committed() {
2097 let num_authorities: u32 = 4;
2098 let (context, _) = Context::new_for_test(num_authorities as usize);
2099 let context = Arc::new(context);
2100
2101 let store = Arc::new(MemStore::new());
2102 let mut dag_state = DagState::new(context.clone(), store.clone());
2103
2104 let block = VerifiedBlock::new_for_test(
2106 TestBlock::new(1, 0)
2107 .set_timestamp_ms(1000)
2108 .set_ancestors(vec![])
2109 .build(),
2110 );
2111
2112 dag_state.accept_block(block.clone());
2113
2114 assert!(!dag_state.is_committed(&block.reference()));
2116
2117 assert!(
2119 dag_state.set_committed(&block.reference()),
2120 "Block should be successfully set as committed for first time"
2121 );
2122
2123 assert!(dag_state.is_committed(&block.reference()));
2125
2126 assert!(
2128 !dag_state.set_committed(&block.reference()),
2129 "Block should not be successfully set as committed"
2130 );
2131 }
2132
2133 #[tokio::test]
2134 async fn test_get_cached_blocks() {
2135 let (mut context, _) = Context::new_for_test(4);
2136 context.parameters.dag_state_cached_rounds = 5;
2137
2138 let context = Arc::new(context);
2139 let store = Arc::new(MemStore::new());
2140 let mut dag_state = DagState::new(context.clone(), store.clone());
2141
2142 let mut all_blocks = Vec::new();
2147 for author in 1..=3 {
2148 for round in 10..(10 + author) {
2149 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2150 all_blocks.push(block.clone());
2151 dag_state.accept_block(block);
2152 }
2153 }
2154
2155 let cached_blocks =
2158 dag_state.get_cached_blocks(context.committee.to_authority_index(0).unwrap(), 0);
2159 assert!(cached_blocks.is_empty());
2160
2161 let cached_blocks =
2162 dag_state.get_cached_blocks(context.committee.to_authority_index(1).unwrap(), 10);
2163 assert_eq!(cached_blocks.len(), 1);
2164 assert_eq!(cached_blocks[0].round(), 10);
2165
2166 let cached_blocks =
2167 dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 10);
2168 assert_eq!(cached_blocks.len(), 2);
2169 assert_eq!(cached_blocks[0].round(), 10);
2170 assert_eq!(cached_blocks[1].round(), 11);
2171
2172 let cached_blocks =
2173 dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 11);
2174 assert_eq!(cached_blocks.len(), 1);
2175 assert_eq!(cached_blocks[0].round(), 11);
2176
2177 let cached_blocks =
2178 dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 10);
2179 assert_eq!(cached_blocks.len(), 3);
2180 assert_eq!(cached_blocks[0].round(), 10);
2181 assert_eq!(cached_blocks[1].round(), 11);
2182 assert_eq!(cached_blocks[2].round(), 12);
2183
2184 let cached_blocks =
2185 dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 12);
2186 assert_eq!(cached_blocks.len(), 1);
2187 assert_eq!(cached_blocks[0].round(), 12);
2188
2189 let cached_blocks = dag_state.get_cached_blocks_in_range(
2193 context.committee.to_authority_index(3).unwrap(),
2194 10,
2195 10,
2196 1,
2197 );
2198 assert!(cached_blocks.is_empty());
2199
2200 let cached_blocks = dag_state.get_cached_blocks_in_range(
2202 context.committee.to_authority_index(3).unwrap(),
2203 11,
2204 10,
2205 1,
2206 );
2207 assert!(cached_blocks.is_empty());
2208
2209 let cached_blocks = dag_state.get_cached_blocks_in_range(
2211 context.committee.to_authority_index(0).unwrap(),
2212 9,
2213 10,
2214 1,
2215 );
2216 assert!(cached_blocks.is_empty());
2217
2218 let cached_blocks = dag_state.get_cached_blocks_in_range(
2220 context.committee.to_authority_index(1).unwrap(),
2221 9,
2222 11,
2223 1,
2224 );
2225 assert_eq!(cached_blocks.len(), 1);
2226 assert_eq!(cached_blocks[0].round(), 10);
2227
2228 let cached_blocks = dag_state.get_cached_blocks_in_range(
2230 context.committee.to_authority_index(2).unwrap(),
2231 9,
2232 12,
2233 5,
2234 );
2235 assert_eq!(cached_blocks.len(), 2);
2236 assert_eq!(cached_blocks[0].round(), 10);
2237 assert_eq!(cached_blocks[1].round(), 11);
2238
2239 let cached_blocks = dag_state.get_cached_blocks_in_range(
2241 context.committee.to_authority_index(3).unwrap(),
2242 11,
2243 20,
2244 5,
2245 );
2246 assert_eq!(cached_blocks.len(), 2);
2247 assert_eq!(cached_blocks[0].round(), 11);
2248 assert_eq!(cached_blocks[1].round(), 12);
2249
2250 let cached_blocks = dag_state.get_cached_blocks_in_range(
2252 context.committee.to_authority_index(3).unwrap(),
2253 10,
2254 20,
2255 1,
2256 );
2257 assert_eq!(cached_blocks.len(), 1);
2258 assert_eq!(cached_blocks[0].round(), 10);
2259 }
2260
2261 #[tokio::test]
2262 async fn test_get_last_cached_block() {
2263 const CACHED_ROUNDS: Round = 2;
2265 const GC_DEPTH: u32 = 1;
2266 let (mut context, _) = Context::new_for_test(4);
2267 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2268 context
2269 .protocol_config
2270 .set_consensus_gc_depth_for_testing(GC_DEPTH);
2271
2272 let context = Arc::new(context);
2273 let store = Arc::new(MemStore::new());
2274 let mut dag_state = DagState::new(context.clone(), store.clone());
2275
2276 let dag_str = "DAG {
2281 Round 0 : { 4 },
2282 Round 1 : {
2283 B -> [*],
2284 C -> [*],
2285 D -> [*],
2286 },
2287 Round 2 : {
2288 C -> [*],
2289 D -> [*],
2290 },
2291 Round 3 : {
2292 D -> [*],
2293 },
2294 }";
2295
2296 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2297
2298 let block = VerifiedBlock::new_for_test(TestBlock::new(2, 2).build());
2300
2301 for block in dag_builder
2303 .all_blocks()
2304 .into_iter()
2305 .chain(std::iter::once(block))
2306 {
2307 dag_state.accept_block(block);
2308 }
2309
2310 dag_state.add_commit(TrustedCommit::new_for_test(
2311 1 as CommitIndex,
2312 CommitDigest::MIN,
2313 context.clock.timestamp_utc_ms(),
2314 dag_builder.leader_block(3).unwrap().reference(),
2315 vec![],
2316 ));
2317
2318 let end_round = 4;
2320 let expected_rounds = vec![0, 1, 2, 3];
2321 let expected_excluded_and_equivocating_blocks = vec![0, 0, 1, 0];
2322 let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2324 assert_eq!(
2325 last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2326 expected_rounds
2327 );
2328 assert_eq!(
2329 last_blocks.iter().map(|b| b.1.len()).collect::<Vec<_>>(),
2330 expected_excluded_and_equivocating_blocks
2331 );
2332
2333 for (i, expected_round) in expected_rounds.iter().enumerate() {
2335 let round = dag_state
2336 .get_last_cached_block_in_range(
2337 context.committee.to_authority_index(i).unwrap(),
2338 0,
2339 end_round,
2340 )
2341 .map(|b| b.round())
2342 .unwrap_or_default();
2343 assert_eq!(round, *expected_round, "Authority {i}");
2344 }
2345
2346 let start_round = 2;
2348 let expected_rounds = [0, 0, 2, 3];
2349
2350 for (i, expected_round) in expected_rounds.iter().enumerate() {
2352 let round = dag_state
2353 .get_last_cached_block_in_range(
2354 context.committee.to_authority_index(i).unwrap(),
2355 start_round,
2356 end_round,
2357 )
2358 .map(|b| b.round())
2359 .unwrap_or_default();
2360 assert_eq!(round, *expected_round, "Authority {i}");
2361 }
2362
2363 dag_state.flush();
2369
2370 let end_round = 3;
2372 let expected_rounds = vec![0, 1, 2, 2];
2373
2374 let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2376 assert_eq!(
2377 last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2378 expected_rounds
2379 );
2380
2381 for (i, expected_round) in expected_rounds.iter().enumerate() {
2383 let round = dag_state
2384 .get_last_cached_block_in_range(
2385 context.committee.to_authority_index(i).unwrap(),
2386 0,
2387 end_round,
2388 )
2389 .map(|b| b.round())
2390 .unwrap_or_default();
2391 assert_eq!(round, *expected_round, "Authority {i}");
2392 }
2393 }
2394
2395 #[tokio::test]
2396 #[should_panic(
2397 expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
2398 )]
2399 async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() {
2400 const CACHED_ROUNDS: Round = 1;
2402 const GC_DEPTH: u32 = 1;
2403 let (mut context, _) = Context::new_for_test(4);
2404 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2405 context
2406 .protocol_config
2407 .set_consensus_gc_depth_for_testing(GC_DEPTH);
2408
2409 let context = Arc::new(context);
2410 let store = Arc::new(MemStore::new());
2411 let mut dag_state = DagState::new(context.clone(), store.clone());
2412
2413 let mut dag_builder = DagBuilder::new(context.clone());
2418 dag_builder
2419 .layers(1..=1)
2420 .authorities(vec![AuthorityIndex::new_for_test(0)])
2421 .skip_block()
2422 .build();
2423 dag_builder
2424 .layers(2..=2)
2425 .authorities(vec![
2426 AuthorityIndex::new_for_test(0),
2427 AuthorityIndex::new_for_test(1),
2428 ])
2429 .skip_block()
2430 .build();
2431 dag_builder
2432 .layers(3..=3)
2433 .authorities(vec![
2434 AuthorityIndex::new_for_test(0),
2435 AuthorityIndex::new_for_test(1),
2436 AuthorityIndex::new_for_test(2),
2437 ])
2438 .skip_block()
2439 .build();
2440
2441 for block in dag_builder.all_blocks() {
2443 dag_state.accept_block(block);
2444 }
2445
2446 dag_state.add_commit(TrustedCommit::new_for_test(
2447 1 as CommitIndex,
2448 CommitDigest::MIN,
2449 0,
2450 dag_builder.leader_block(3).unwrap().reference(),
2451 vec![],
2452 ));
2453
2454 dag_state.flush();
2456
2457 dag_state.get_last_cached_block_per_authority(2);
2459 }
2460
2461 #[tokio::test]
2462 async fn test_last_quorum() {
2463 let (context, _) = Context::new_for_test(4);
2465 let context = Arc::new(context);
2466 let store = Arc::new(MemStore::new());
2467 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2468
2469 {
2471 let genesis = genesis_blocks(context.as_ref());
2472
2473 assert_eq!(dag_state.read().last_quorum(), genesis);
2474 }
2475
2476 {
2478 let mut dag_builder = DagBuilder::new(context.clone());
2479 dag_builder
2480 .layers(1..=4)
2481 .build()
2482 .persist_layers(dag_state.clone());
2483 let round_4_blocks: Vec<_> = dag_builder
2484 .blocks(4..=4)
2485 .into_iter()
2486 .map(|block| block.reference())
2487 .collect();
2488
2489 let last_quorum = dag_state.read().last_quorum();
2490
2491 assert_eq!(
2492 last_quorum
2493 .into_iter()
2494 .map(|block| block.reference())
2495 .collect::<Vec<_>>(),
2496 round_4_blocks
2497 );
2498 }
2499
2500 {
2502 let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2503 dag_state.write().accept_block(block);
2504
2505 let round_4_blocks = dag_state.read().get_uncommitted_blocks_at_round(4);
2506
2507 let last_quorum = dag_state.read().last_quorum();
2508
2509 assert_eq!(last_quorum, round_4_blocks);
2510 }
2511 }
2512
2513 #[tokio::test]
2514 async fn test_last_block_for_authority() {
2515 let (context, _) = Context::new_for_test(4);
2517 let context = Arc::new(context);
2518 let store = Arc::new(MemStore::new());
2519 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2520
2521 {
2523 let genesis = genesis_blocks(context.as_ref());
2524 let my_genesis = genesis
2525 .into_iter()
2526 .find(|block| block.author() == context.own_index)
2527 .unwrap();
2528
2529 assert_eq!(dag_state.read().get_last_proposed_block(), my_genesis);
2530 }
2531
2532 {
2534 let mut dag_builder = DagBuilder::new(context.clone());
2536 dag_builder
2537 .layers(1..=4)
2538 .build()
2539 .persist_layers(dag_state.clone());
2540
2541 let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2543 dag_state.write().accept_block(block);
2544
2545 let block = dag_state
2546 .read()
2547 .get_last_block_for_authority(AuthorityIndex::new_for_test(0));
2548 assert_eq!(block.round(), 5);
2549
2550 for (authority_index, _) in context.committee.authorities() {
2551 let block = dag_state
2552 .read()
2553 .get_last_block_for_authority(authority_index);
2554
2555 if authority_index.value() == 0 {
2556 assert_eq!(block.round(), 5);
2557 } else {
2558 assert_eq!(block.round(), 4);
2559 }
2560 }
2561 }
2562 }
2563
2564 #[tokio::test]
2565 async fn test_accept_block_not_panics_when_timestamp_is_ahead_and_median_timestamp() {
2566 let (context, _) = Context::new_for_test(4);
2568 let context = Arc::new(context);
2569 let store = Arc::new(MemStore::new());
2570 let mut dag_state = DagState::new(context.clone(), store.clone());
2571
2572 let block_timestamp = context.clock.timestamp_utc_ms() + 5_000;
2574
2575 let block = VerifiedBlock::new_for_test(
2576 TestBlock::new(10, 0)
2577 .set_timestamp_ms(block_timestamp)
2578 .build(),
2579 );
2580
2581 dag_state.accept_block(block);
2583 }
2584
2585 #[tokio::test]
2586 async fn test_last_finalized_commit() {
2587 let (context, _) = Context::new_for_test(4);
2589 let context = Arc::new(context);
2590 let store = Arc::new(MemStore::new());
2591 let mut dag_state = DagState::new(context.clone(), store.clone());
2592
2593 let commit_ref = CommitRef::new(1, CommitDigest::MIN);
2595 let rejected_transactions = BTreeMap::new();
2596 dag_state.add_finalized_commit(commit_ref, rejected_transactions.clone());
2597
2598 assert_eq!(dag_state.finalized_commits_to_write.len(), 1);
2600 assert_eq!(
2601 dag_state.finalized_commits_to_write[0],
2602 (commit_ref, rejected_transactions.clone())
2603 );
2604
2605 dag_state.flush();
2607
2608 let last_finalized_commit = store.read_last_finalized_commit().unwrap();
2610 assert_eq!(last_finalized_commit, Some(commit_ref));
2611 let stored_rejected_transactions = store
2612 .read_rejected_transactions(commit_ref)
2613 .unwrap()
2614 .unwrap();
2615 assert_eq!(stored_rejected_transactions, rejected_transactions);
2616 }
2617}