1use std::{
5 cmp::max,
6 collections::{BTreeMap, BTreeSet, VecDeque},
7 ops::Bound::{Excluded, Included, Unbounded},
8 panic,
9 sync::Arc,
10 time::Duration,
11 vec,
12};
13
14use consensus_config::AuthorityIndex;
15use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs, Round, TransactionIndex};
16use itertools::Itertools as _;
17use tokio::time::Instant;
18use tracing::{debug, error, info, trace};
19
20use crate::{
21 CommittedSubDag,
22 block::{BlockAPI, GENESIS_ROUND, Slot, VerifiedBlock, genesis_blocks},
23 commit::{
24 CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRef, CommitVote,
25 GENESIS_COMMIT_INDEX, TrustedCommit, load_committed_subdag_from_store,
26 },
27 context::Context,
28 leader_scoring::{ReputationScores, ScoringSubdag},
29 storage::{Store, WriteBatch},
30 threshold_clock::ThresholdClock,
31};
32
33pub struct DagState {
41 context: Arc<Context>,
42
43 genesis: BTreeMap<BlockRef, VerifiedBlock>,
45
46 recent_blocks: BTreeMap<BlockRef, BlockInfo>,
54
55 recent_refs_by_authority: Vec<BTreeSet<BlockRef>>,
58
59 threshold_clock: ThresholdClock,
61
62 evicted_rounds: Vec<Round>,
66
67 highest_accepted_round: Round,
69
70 last_commit: Option<TrustedCommit>,
72
73 last_commit_round_advancement_time: Option<std::time::Instant>,
75
76 last_committed_rounds: Vec<Round>,
78
79 scoring_subdag: ScoringSubdag,
82
83 pending_commit_votes: VecDeque<CommitVote>,
87
88 blocks_to_write: Vec<VerifiedBlock>,
91 commits_to_write: Vec<TrustedCommit>,
92
93 commit_info_to_write: Vec<(CommitRef, CommitInfo)>,
97
98 finalized_commits_to_write: Vec<(CommitRef, BTreeMap<BlockRef, Vec<TransactionIndex>>)>,
100
101 store: Arc<dyn Store>,
103
104 cached_rounds: Round,
106}
107
108impl DagState {
109 pub fn new(context: Arc<Context>, store: Arc<dyn Store>) -> Self {
111 let cached_rounds = context.parameters.dag_state_cached_rounds as Round;
112 let num_authorities = context.committee.size();
113
114 let genesis = genesis_blocks(context.as_ref())
115 .into_iter()
116 .map(|block| (block.reference(), block))
117 .collect();
118
119 let threshold_clock = ThresholdClock::new(1, context.clone());
120
121 let last_commit = store
122 .read_last_commit()
123 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
124
125 let commit_info = store
126 .read_last_commit_info()
127 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
128 let (mut last_committed_rounds, commit_recovery_start_index) =
129 if let Some((commit_ref, commit_info)) = commit_info {
130 tracing::info!("Recovering committed state from {commit_ref} {commit_info:?}");
131 (commit_info.committed_rounds, commit_ref.index + 1)
132 } else {
133 tracing::info!("Found no stored CommitInfo to recover from");
134 (vec![0; num_authorities], GENESIS_COMMIT_INDEX + 1)
135 };
136
137 let mut unscored_committed_subdags = Vec::new();
138 let mut scoring_subdag = ScoringSubdag::new(context.clone());
139
140 if let Some(last_commit) = last_commit.as_ref() {
141 store
142 .scan_commits((commit_recovery_start_index..=last_commit.index()).into())
143 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
144 .iter()
145 .for_each(|commit| {
146 for block_ref in commit.blocks() {
147 last_committed_rounds[block_ref.author] =
148 max(last_committed_rounds[block_ref.author], block_ref.round);
149 }
150 let committed_subdag =
151 load_committed_subdag_from_store(store.as_ref(), commit.clone(), vec![]);
152 unscored_committed_subdags.push(committed_subdag);
154 });
155 }
156
157 tracing::info!(
158 "DagState was initialized with the following state: \
159 {last_commit:?}; {last_committed_rounds:?}; {} unscored committed subdags;",
160 unscored_committed_subdags.len()
161 );
162
163 scoring_subdag.add_subdags(std::mem::take(&mut unscored_committed_subdags));
164
165 let mut state = Self {
166 context: context.clone(),
167 genesis,
168 recent_blocks: BTreeMap::new(),
169 recent_refs_by_authority: vec![BTreeSet::new(); num_authorities],
170 threshold_clock,
171 highest_accepted_round: 0,
172 last_commit: last_commit.clone(),
173 last_commit_round_advancement_time: None,
174 last_committed_rounds: last_committed_rounds.clone(),
175 pending_commit_votes: VecDeque::new(),
176 blocks_to_write: vec![],
177 commits_to_write: vec![],
178 commit_info_to_write: vec![],
179 finalized_commits_to_write: vec![],
180 scoring_subdag,
181 store: store.clone(),
182 cached_rounds,
183 evicted_rounds: vec![0; num_authorities],
184 };
185
186 for (authority_index, _) in context.committee.authorities() {
187 let (blocks, eviction_round) = {
188 let last_block = state
191 .store
192 .scan_last_blocks_by_author(authority_index, 1, None)
193 .expect("Database error");
194 let last_block_round = last_block
195 .last()
196 .map(|b| b.round())
197 .unwrap_or(GENESIS_ROUND);
198
199 let eviction_round =
200 Self::eviction_round(last_block_round, state.gc_round(), state.cached_rounds);
201 let blocks = state
202 .store
203 .scan_blocks_by_author(authority_index, eviction_round + 1)
204 .expect("Database error");
205
206 (blocks, eviction_round)
207 };
208
209 state.evicted_rounds[authority_index] = eviction_round;
210
211 for block in &blocks {
213 state.update_block_metadata(block);
214 }
215
216 debug!(
217 "Recovered blocks {}: {:?}",
218 authority_index,
219 blocks
220 .iter()
221 .map(|b| b.reference())
222 .collect::<Vec<BlockRef>>()
223 );
224 }
225
226 if let Some(last_commit) = last_commit {
227 let mut index = last_commit.index();
228 let gc_round = state.gc_round();
229 info!(
230 "Recovering block commit statuses from commit index {} and backwards until leader of round <= gc_round {:?}",
231 index, gc_round
232 );
233
234 loop {
235 let commits = store
236 .scan_commits((index..=index).into())
237 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
238 let Some(commit) = commits.first() else {
239 info!("Recovering finished up to index {index}, no more commits to recover");
240 break;
241 };
242
243 if gc_round > 0 && commit.leader().round <= gc_round {
245 info!(
246 "Recovering finished, reached commit leader round {} <= gc_round {}",
247 commit.leader().round,
248 gc_round
249 );
250 break;
251 }
252
253 commit.blocks().iter().filter(|b| b.round > gc_round).for_each(|block_ref|{
254 debug!(
255 "Setting block {:?} as committed based on commit {:?}",
256 block_ref,
257 commit.index()
258 );
259 assert!(state.set_committed(block_ref), "Attempted to set again a block {:?} as committed when recovering commit {:?}", block_ref, commit);
260 });
261
262 index = index.saturating_sub(1);
264 if index == 0 {
265 break;
266 }
267 }
268 }
269
270 let proposed_blocks = store
272 .scan_blocks_by_author(context.own_index, state.gc_round() + 1)
273 .expect("Database error");
274 for block in proposed_blocks {
275 state.link_causal_history(block.reference());
276 }
277
278 state
279 }
280
281 pub(crate) fn accept_block(&mut self, block: VerifiedBlock) {
283 assert_ne!(
284 block.round(),
285 0,
286 "Genesis block should not be accepted into DAG."
287 );
288
289 let block_ref = block.reference();
290 if self.contains_block(&block_ref) {
291 return;
292 }
293
294 let now = self.context.clock.timestamp_utc_ms();
295 if block.timestamp_ms() > now {
296 trace!(
297 "Block {:?} with timestamp {} is greater than local timestamp {}.",
298 block,
299 block.timestamp_ms(),
300 now,
301 );
302 }
303 let hostname = &self.context.committee.authority(block_ref.author).hostname;
304 self.context
305 .metrics
306 .node_metrics
307 .accepted_block_time_drift_ms
308 .with_label_values(&[hostname])
309 .inc_by(block.timestamp_ms().saturating_sub(now));
310
311 if block_ref.author == self.context.own_index {
314 let existing_blocks = self.get_uncommitted_blocks_at_slot(block_ref.into());
315 assert!(
316 existing_blocks.is_empty(),
317 "Block Rejected! Attempted to add block {block:#?} to own slot where \
318 block(s) {existing_blocks:#?} already exists."
319 );
320 }
321 self.update_block_metadata(&block);
322 self.blocks_to_write.push(block);
323 let source = if self.context.own_index == block_ref.author {
324 "own"
325 } else {
326 "others"
327 };
328 self.context
329 .metrics
330 .node_metrics
331 .accepted_blocks
332 .with_label_values(&[source])
333 .inc();
334 }
335
336 fn update_block_metadata(&mut self, block: &VerifiedBlock) {
338 let block_ref = block.reference();
339 self.recent_blocks
340 .insert(block_ref, BlockInfo::new(block.clone()));
341 self.recent_refs_by_authority[block_ref.author].insert(block_ref);
342
343 if self.threshold_clock.add_block(block_ref) {
344 let last_proposed_block = self.get_last_proposed_block();
346 if last_proposed_block.round() == block_ref.round {
347 let quorum_delay_ms = self
348 .context
349 .clock
350 .timestamp_utc_ms()
351 .saturating_sub(self.get_last_proposed_block().timestamp_ms());
352 self.context
353 .metrics
354 .node_metrics
355 .quorum_receive_latency
356 .observe(Duration::from_millis(quorum_delay_ms).as_secs_f64());
357 }
358 }
359
360 self.highest_accepted_round = max(self.highest_accepted_round, block.round());
361 self.context
362 .metrics
363 .node_metrics
364 .highest_accepted_round
365 .set(self.highest_accepted_round as i64);
366
367 let highest_accepted_round_for_author = self.recent_refs_by_authority[block_ref.author]
368 .last()
369 .map(|block_ref| block_ref.round)
370 .expect("There should be by now at least one block ref");
371 let hostname = &self.context.committee.authority(block_ref.author).hostname;
372 self.context
373 .metrics
374 .node_metrics
375 .highest_accepted_authority_round
376 .with_label_values(&[hostname])
377 .set(highest_accepted_round_for_author as i64);
378 }
379
380 pub(crate) fn accept_blocks(&mut self, blocks: Vec<VerifiedBlock>) {
382 debug!(
383 "Accepting blocks: {}",
384 blocks.iter().map(|b| b.reference().to_string()).join(",")
385 );
386 for block in blocks {
387 self.accept_block(block);
388 }
389 }
390
391 pub(crate) fn get_block(&self, reference: &BlockRef) -> Option<VerifiedBlock> {
394 self.get_blocks(&[*reference])
395 .pop()
396 .expect("Exactly one element should be returned")
397 }
398
399 pub(crate) fn get_blocks(&self, block_refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
402 let mut blocks = vec![None; block_refs.len()];
403 let mut missing = Vec::new();
404
405 for (index, block_ref) in block_refs.iter().enumerate() {
406 if block_ref.round == GENESIS_ROUND {
407 if let Some(block) = self.genesis.get(block_ref) {
409 blocks[index] = Some(block.clone());
410 }
411 continue;
412 }
413 if let Some(block_info) = self.recent_blocks.get(block_ref) {
414 blocks[index] = Some(block_info.block.clone());
415 continue;
416 }
417 missing.push((index, block_ref));
418 }
419
420 if missing.is_empty() {
421 return blocks;
422 }
423
424 let missing_refs = missing
425 .iter()
426 .map(|(_, block_ref)| **block_ref)
427 .collect::<Vec<_>>();
428 let store_results = self
429 .store
430 .read_blocks(&missing_refs)
431 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
432 self.context
433 .metrics
434 .node_metrics
435 .dag_state_store_read_count
436 .with_label_values(&["get_blocks"])
437 .inc();
438
439 for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
440 blocks[index] = result;
441 }
442
443 blocks
444 }
445
446 pub(crate) fn get_uncommitted_blocks_at_slot(&self, slot: Slot) -> Vec<VerifiedBlock> {
449 let mut blocks = vec![];
453 for (_block_ref, block_info) in self.recent_blocks.range((
454 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
455 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
456 )) {
457 blocks.push(block_info.block.clone())
458 }
459 blocks
460 }
461
462 pub(crate) fn get_uncommitted_blocks_at_round(&self, round: Round) -> Vec<VerifiedBlock> {
465 if round <= self.last_commit_round() {
466 panic!("Round {} have committed blocks!", round);
467 }
468
469 let mut blocks = vec![];
470 for (_block_ref, block_info) in self.recent_blocks.range((
471 Included(BlockRef::new(round, AuthorityIndex::ZERO, BlockDigest::MIN)),
472 Excluded(BlockRef::new(
473 round + 1,
474 AuthorityIndex::ZERO,
475 BlockDigest::MIN,
476 )),
477 )) {
478 blocks.push(block_info.block.clone())
479 }
480 blocks
481 }
482
483 pub(crate) fn ancestors_at_round(
485 &self,
486 later_block: &VerifiedBlock,
487 earlier_round: Round,
488 ) -> Vec<VerifiedBlock> {
489 let mut linked: BTreeSet<BlockRef> = later_block.ancestors().iter().cloned().collect();
491 while !linked.is_empty() {
492 let round = linked.last().unwrap().round;
493 if round <= earlier_round {
495 break;
496 }
497 let block_ref = linked.pop_last().unwrap();
498 let Some(block) = self.get_block(&block_ref) else {
499 panic!("Block {:?} should exist in DAG!", block_ref);
500 };
501 linked.extend(block.ancestors().iter().cloned());
502 }
503 linked
504 .range((
505 Included(BlockRef::new(
506 earlier_round,
507 AuthorityIndex::ZERO,
508 BlockDigest::MIN,
509 )),
510 Unbounded,
511 ))
512 .map(|r| {
513 self.get_block(r)
514 .unwrap_or_else(|| panic!("Block {:?} should exist in DAG!", r))
515 .clone()
516 })
517 .collect()
518 }
519
520 pub(crate) fn get_last_proposed_block(&self) -> VerifiedBlock {
523 self.get_last_block_for_authority(self.context.own_index)
524 }
525
526 pub(crate) fn get_last_block_for_authority(&self, authority: AuthorityIndex) -> VerifiedBlock {
529 if let Some(last) = self.recent_refs_by_authority[authority].last() {
530 return self
531 .recent_blocks
532 .get(last)
533 .expect("Block should be found in recent blocks")
534 .block
535 .clone();
536 }
537
538 let (_, genesis_block) = self
540 .genesis
541 .iter()
542 .find(|(block_ref, _)| block_ref.author == authority)
543 .expect("Genesis should be found for authority {authority_index}");
544 genesis_block.clone()
545 }
546
547 pub(crate) fn get_cached_blocks(
553 &self,
554 authority: AuthorityIndex,
555 start: Round,
556 ) -> Vec<VerifiedBlock> {
557 self.get_cached_blocks_in_range(authority, start, Round::MAX, usize::MAX)
558 }
559
560 pub(crate) fn get_cached_blocks_in_range(
563 &self,
564 authority: AuthorityIndex,
565 start_round: Round,
566 end_round: Round,
567 limit: usize,
568 ) -> Vec<VerifiedBlock> {
569 if start_round >= end_round || limit == 0 {
570 return vec![];
571 }
572
573 let mut blocks = vec![];
574 for block_ref in self.recent_refs_by_authority[authority].range((
575 Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
576 Excluded(BlockRef::new(
577 end_round,
578 AuthorityIndex::MIN,
579 BlockDigest::MIN,
580 )),
581 )) {
582 let block_info = self
583 .recent_blocks
584 .get(block_ref)
585 .expect("Block should exist in recent blocks");
586 blocks.push(block_info.block.clone());
587 if blocks.len() >= limit {
588 break;
589 }
590 }
591 blocks
592 }
593
594 pub(crate) fn get_last_cached_block_in_range(
596 &self,
597 authority: AuthorityIndex,
598 start_round: Round,
599 end_round: Round,
600 ) -> Option<VerifiedBlock> {
601 if start_round >= end_round {
602 return None;
603 }
604
605 let block_ref = self.recent_refs_by_authority[authority]
606 .range((
607 Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
608 Excluded(BlockRef::new(
609 end_round,
610 AuthorityIndex::MIN,
611 BlockDigest::MIN,
612 )),
613 ))
614 .last()?;
615
616 self.recent_blocks
617 .get(block_ref)
618 .map(|block_info| block_info.block.clone())
619 }
620
621 pub(crate) fn get_last_cached_block_per_authority(
628 &self,
629 end_round: Round,
630 ) -> Vec<(VerifiedBlock, Vec<BlockRef>)> {
631 let mut blocks = self.genesis.values().cloned().collect::<Vec<_>>();
633 let mut equivocating_blocks = vec![vec![]; self.context.committee.size()];
634
635 if end_round == GENESIS_ROUND {
636 panic!(
637 "Attempted to retrieve blocks earlier than the genesis round which is not possible"
638 );
639 }
640
641 if end_round == GENESIS_ROUND + 1 {
642 return blocks.into_iter().map(|b| (b, vec![])).collect();
643 }
644
645 for (authority_index, block_refs) in self.recent_refs_by_authority.iter().enumerate() {
646 let authority_index = self
647 .context
648 .committee
649 .to_authority_index(authority_index)
650 .unwrap();
651
652 let last_evicted_round = self.evicted_rounds[authority_index];
653 if end_round.saturating_sub(1) <= last_evicted_round {
654 panic!(
655 "Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}",
656 );
657 }
658
659 let block_ref_iter = block_refs
660 .range((
661 Included(BlockRef::new(
662 last_evicted_round + 1,
663 authority_index,
664 BlockDigest::MIN,
665 )),
666 Excluded(BlockRef::new(end_round, authority_index, BlockDigest::MIN)),
667 ))
668 .rev();
669
670 let mut last_round = 0;
671 for block_ref in block_ref_iter {
672 if last_round == 0 {
673 last_round = block_ref.round;
674 let block_info = self
675 .recent_blocks
676 .get(block_ref)
677 .expect("Block should exist in recent blocks");
678 blocks[authority_index] = block_info.block.clone();
679 continue;
680 }
681 if block_ref.round < last_round {
682 break;
683 }
684 equivocating_blocks[authority_index].push(*block_ref);
685 }
686 }
687
688 blocks.into_iter().zip(equivocating_blocks).collect()
689 }
690
691 pub(crate) fn contains_cached_block_at_slot(&self, slot: Slot) -> bool {
694 if slot.round == GENESIS_ROUND {
696 return true;
697 }
698
699 let eviction_round = self.evicted_rounds[slot.authority];
700 if slot.round <= eviction_round {
701 panic!(
702 "{}",
703 format!(
704 "Attempted to check for slot {slot} that is <= the last evicted round {eviction_round}"
705 )
706 );
707 }
708
709 let mut result = self.recent_refs_by_authority[slot.authority].range((
710 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
711 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
712 ));
713 result.next().is_some()
714 }
715
716 pub(crate) fn contains_blocks(&self, block_refs: Vec<BlockRef>) -> Vec<bool> {
719 let mut exist = vec![false; block_refs.len()];
720 let mut missing = Vec::new();
721
722 for (index, block_ref) in block_refs.into_iter().enumerate() {
723 let recent_refs = &self.recent_refs_by_authority[block_ref.author];
724 if recent_refs.contains(&block_ref) || self.genesis.contains_key(&block_ref) {
725 exist[index] = true;
726 } else if recent_refs.is_empty() || recent_refs.last().unwrap().round < block_ref.round
727 {
728 exist[index] = false;
732 } else {
733 missing.push((index, block_ref));
734 }
735 }
736
737 if missing.is_empty() {
738 return exist;
739 }
740
741 let missing_refs = missing
742 .iter()
743 .map(|(_, block_ref)| *block_ref)
744 .collect::<Vec<_>>();
745 let store_results = self
746 .store
747 .contains_blocks(&missing_refs)
748 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
749 self.context
750 .metrics
751 .node_metrics
752 .dag_state_store_read_count
753 .with_label_values(&["contains_blocks"])
754 .inc();
755
756 for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
757 exist[index] = result;
758 }
759
760 exist
761 }
762
763 pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool {
764 let blocks = self.contains_blocks(vec![*block_ref]);
765 blocks.first().cloned().unwrap()
766 }
767
768 pub(crate) fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
771 if let Some(block_info) = self.recent_blocks.get_mut(block_ref) {
772 if !block_info.committed {
773 block_info.committed = true;
774 return true;
775 }
776 false
777 } else {
778 panic!(
779 "Block {:?} not found in cache to set as committed.",
780 block_ref
781 );
782 }
783 }
784
785 pub(crate) fn is_committed(&self, block_ref: &BlockRef) -> bool {
787 self.recent_blocks
788 .get(block_ref)
789 .unwrap_or_else(|| panic!("Attempted to query for commit status for a block not in cached data {block_ref}"))
790 .committed
791 }
792
793 pub(crate) fn link_causal_history(&mut self, root_block: BlockRef) -> Vec<BlockRef> {
797 let gc_round = self.gc_round();
798 let mut linked_blocks = vec![];
799 let mut targets = VecDeque::new();
800 targets.push_back(root_block);
801 while let Some(block_ref) = targets.pop_front() {
802 if block_ref.round <= gc_round {
804 continue;
805 }
806 let block_info = self
807 .recent_blocks
808 .get_mut(&block_ref)
809 .unwrap_or_else(|| panic!("Block {:?} is not in DAG state", block_ref));
810 if block_info.included {
811 continue;
812 }
813 linked_blocks.push(block_ref);
814 block_info.included = true;
815 targets.extend(block_info.block.ancestors().iter());
816 }
817 linked_blocks
818 }
819
820 pub(crate) fn has_been_included(&self, block_ref: &BlockRef) -> bool {
823 self.recent_blocks
824 .get(block_ref)
825 .unwrap_or_else(|| {
826 panic!(
827 "Attempted to query for inclusion status for a block not in cached data {}",
828 block_ref
829 )
830 })
831 .included
832 }
833
834 pub(crate) fn threshold_clock_round(&self) -> Round {
835 self.threshold_clock.get_round()
836 }
837
838 pub(crate) fn threshold_clock_quorum_ts(&self) -> Instant {
840 self.threshold_clock.get_quorum_ts()
841 }
842
843 pub(crate) fn highest_accepted_round(&self) -> Round {
844 self.highest_accepted_round
845 }
846
847 pub(crate) fn add_commit(&mut self, commit: TrustedCommit) {
850 let time_diff = if let Some(last_commit) = &self.last_commit {
851 if commit.index() <= last_commit.index() {
852 error!(
853 "New commit index {} <= last commit index {}!",
854 commit.index(),
855 last_commit.index()
856 );
857 return;
858 }
859 assert_eq!(commit.index(), last_commit.index() + 1);
860
861 if commit.timestamp_ms() < last_commit.timestamp_ms() {
862 panic!(
863 "Commit timestamps do not monotonically increment, prev commit {:?}, new commit {:?}",
864 last_commit, commit
865 );
866 }
867 commit
868 .timestamp_ms()
869 .saturating_sub(last_commit.timestamp_ms())
870 } else {
871 assert_eq!(commit.index(), 1);
872 0
873 };
874
875 self.context
876 .metrics
877 .node_metrics
878 .last_commit_time_diff
879 .observe(time_diff as f64);
880
881 let commit_round_advanced = if let Some(previous_commit) = &self.last_commit {
882 previous_commit.round() < commit.round()
883 } else {
884 true
885 };
886
887 self.last_commit = Some(commit.clone());
888
889 if commit_round_advanced {
890 let now = std::time::Instant::now();
891 if let Some(previous_time) = self.last_commit_round_advancement_time {
892 self.context
893 .metrics
894 .node_metrics
895 .commit_round_advancement_interval
896 .observe(now.duration_since(previous_time).as_secs_f64())
897 }
898 self.last_commit_round_advancement_time = Some(now);
899 }
900
901 for block_ref in commit.blocks().iter() {
902 self.last_committed_rounds[block_ref.author] = max(
903 self.last_committed_rounds[block_ref.author],
904 block_ref.round,
905 );
906 }
907
908 for (i, round) in self.last_committed_rounds.iter().enumerate() {
909 let index = self.context.committee.to_authority_index(i).unwrap();
910 let hostname = &self.context.committee.authority(index).hostname;
911 self.context
912 .metrics
913 .node_metrics
914 .last_committed_authority_round
915 .with_label_values(&[hostname])
916 .set((*round).into());
917 }
918
919 self.pending_commit_votes.push_back(commit.reference());
920 self.commits_to_write.push(commit);
921 }
922
923 pub(crate) fn recover_commits_to_write(&mut self, commits: Vec<TrustedCommit>) {
925 self.commits_to_write.extend(commits);
926 }
927
928 pub(crate) fn ensure_commits_to_write_is_empty(&self) {
929 assert!(
930 self.commits_to_write.is_empty(),
931 "Commits to write should be empty. {:?}",
932 self.commits_to_write,
933 );
934 }
935
936 pub(crate) fn add_commit_info(&mut self, reputation_scores: ReputationScores) {
937 assert!(self.scoring_subdag.is_empty());
941
942 let commit_info = CommitInfo {
943 committed_rounds: self.last_committed_rounds.clone(),
944 reputation_scores,
945 };
946 let last_commit = self
947 .last_commit
948 .as_ref()
949 .expect("Last commit should already be set.");
950 self.commit_info_to_write
951 .push((last_commit.reference(), commit_info));
952 }
953
954 pub(crate) fn add_finalized_commit(
955 &mut self,
956 commit_ref: CommitRef,
957 rejected_transactions: BTreeMap<BlockRef, Vec<TransactionIndex>>,
958 ) {
959 self.finalized_commits_to_write
960 .push((commit_ref, rejected_transactions));
961 }
962
963 pub(crate) fn take_commit_votes(&mut self, limit: usize) -> Vec<CommitVote> {
964 let mut votes = Vec::new();
965 while !self.pending_commit_votes.is_empty() && votes.len() < limit {
966 votes.push(self.pending_commit_votes.pop_front().unwrap());
967 }
968 votes
969 }
970
971 pub(crate) fn last_commit_index(&self) -> CommitIndex {
973 match &self.last_commit {
974 Some(commit) => commit.index(),
975 None => 0,
976 }
977 }
978
979 pub(crate) fn last_commit_digest(&self) -> CommitDigest {
981 match &self.last_commit {
982 Some(commit) => commit.digest(),
983 None => CommitDigest::MIN,
984 }
985 }
986
987 pub(crate) fn last_commit_timestamp_ms(&self) -> BlockTimestampMs {
989 match &self.last_commit {
990 Some(commit) => commit.timestamp_ms(),
991 None => 0,
992 }
993 }
994
995 pub(crate) fn last_commit_leader(&self) -> Slot {
997 match &self.last_commit {
998 Some(commit) => commit.leader().into(),
999 None => self
1000 .genesis
1001 .iter()
1002 .next()
1003 .map(|(genesis_ref, _)| *genesis_ref)
1004 .expect("Genesis blocks should always be available.")
1005 .into(),
1006 }
1007 }
1008
1009 pub(crate) fn last_commit_round(&self) -> Round {
1011 match &self.last_commit {
1012 Some(commit) => commit.leader().round,
1013 None => 0,
1014 }
1015 }
1016
1017 pub(crate) fn last_committed_rounds(&self) -> Vec<Round> {
1019 self.last_committed_rounds.clone()
1020 }
1021
1022 pub(crate) fn gc_round(&self) -> Round {
1026 self.calculate_gc_round(self.last_commit_round())
1027 }
1028
1029 pub(crate) fn calculate_gc_round(&self, commit_round: Round) -> Round {
1032 commit_round.saturating_sub(self.context.protocol_config.gc_depth())
1033 }
1034
1035 pub(crate) fn flush(&mut self) {
1045 let _s = self
1046 .context
1047 .metrics
1048 .node_metrics
1049 .scope_processing_time
1050 .with_label_values(&["DagState::flush"])
1051 .start_timer();
1052
1053 let pending_blocks = std::mem::take(&mut self.blocks_to_write);
1055 let pending_commits = std::mem::take(&mut self.commits_to_write);
1056 let pending_commit_info = std::mem::take(&mut self.commit_info_to_write);
1057 let pending_finalized_commits = std::mem::take(&mut self.finalized_commits_to_write);
1058 if pending_blocks.is_empty()
1059 && pending_commits.is_empty()
1060 && pending_commit_info.is_empty()
1061 && pending_finalized_commits.is_empty()
1062 {
1063 return;
1064 }
1065
1066 debug!(
1067 "Flushing {} blocks ({}), {} commits ({}), {} commit infos ({}), {} finalized commits ({}) to storage.",
1068 pending_blocks.len(),
1069 pending_blocks
1070 .iter()
1071 .map(|b| b.reference().to_string())
1072 .join(","),
1073 pending_commits.len(),
1074 pending_commits
1075 .iter()
1076 .map(|c| c.reference().to_string())
1077 .join(","),
1078 pending_commit_info.len(),
1079 pending_commit_info
1080 .iter()
1081 .map(|(commit_ref, _)| commit_ref.to_string())
1082 .join(","),
1083 pending_finalized_commits.len(),
1084 pending_finalized_commits
1085 .iter()
1086 .map(|(commit_ref, _)| commit_ref.to_string())
1087 .join(","),
1088 );
1089 self.store
1090 .write(WriteBatch::new(
1091 pending_blocks,
1092 pending_commits,
1093 pending_commit_info,
1094 pending_finalized_commits,
1095 ))
1096 .unwrap_or_else(|e| panic!("Failed to write to storage: {:?}", e));
1097 self.context
1098 .metrics
1099 .node_metrics
1100 .dag_state_store_write_count
1101 .inc();
1102
1103 for (authority_index, _) in self.context.committee.authorities() {
1105 let eviction_round = self.calculate_authority_eviction_round(authority_index);
1106 while let Some(block_ref) = self.recent_refs_by_authority[authority_index].first() {
1107 if block_ref.round <= eviction_round {
1108 self.recent_blocks.remove(block_ref);
1109 self.recent_refs_by_authority[authority_index].pop_first();
1110 } else {
1111 break;
1112 }
1113 }
1114 self.evicted_rounds[authority_index] = eviction_round;
1115 }
1116
1117 let metrics = &self.context.metrics.node_metrics;
1118 metrics
1119 .dag_state_recent_blocks
1120 .set(self.recent_blocks.len() as i64);
1121 metrics.dag_state_recent_refs.set(
1122 self.recent_refs_by_authority
1123 .iter()
1124 .map(BTreeSet::len)
1125 .sum::<usize>() as i64,
1126 );
1127 }
1128
1129 pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> {
1130 self.store
1131 .read_last_commit_info()
1132 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
1133 }
1134
1135 pub(crate) fn add_scoring_subdags(&mut self, scoring_subdags: Vec<CommittedSubDag>) {
1136 self.scoring_subdag.add_subdags(scoring_subdags);
1137 }
1138
1139 pub(crate) fn clear_scoring_subdag(&mut self) {
1140 self.scoring_subdag.clear();
1141 }
1142
1143 pub(crate) fn scoring_subdags_count(&self) -> usize {
1144 self.scoring_subdag.scored_subdags_count()
1145 }
1146
1147 pub(crate) fn is_scoring_subdag_empty(&self) -> bool {
1148 self.scoring_subdag.is_empty()
1149 }
1150
1151 pub(crate) fn calculate_scoring_subdag_scores(&self) -> ReputationScores {
1152 self.scoring_subdag.calculate_distributed_vote_scores()
1153 }
1154
1155 pub(crate) fn scoring_subdag_commit_range(&self) -> CommitIndex {
1156 self.scoring_subdag
1157 .commit_range
1158 .as_ref()
1159 .expect("commit range should exist for scoring subdag")
1160 .end()
1161 }
1162
1163 fn calculate_authority_eviction_round(&self, authority_index: AuthorityIndex) -> Round {
1167 let last_round = self.recent_refs_by_authority[authority_index]
1168 .last()
1169 .map(|block_ref| block_ref.round)
1170 .unwrap_or(GENESIS_ROUND);
1171
1172 Self::eviction_round(last_round, self.gc_round(), self.cached_rounds)
1173 }
1174
1175 fn eviction_round(last_round: Round, gc_round: Round, cached_rounds: u32) -> Round {
1178 gc_round.min(last_round.saturating_sub(cached_rounds))
1179 }
1180
1181 pub(crate) fn store(&self) -> Arc<dyn Store> {
1183 self.store.clone()
1184 }
1185
1186 #[cfg(test)]
1189 pub(crate) fn last_quorum(&self) -> Vec<VerifiedBlock> {
1190 for round in
1193 (self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev()
1194 {
1195 if round == GENESIS_ROUND {
1196 return self.genesis_blocks();
1197 }
1198 use crate::stake_aggregator::{QuorumThreshold, StakeAggregator};
1199 let mut quorum = StakeAggregator::<QuorumThreshold>::new();
1200
1201 let blocks = self.get_uncommitted_blocks_at_round(round);
1203 for block in &blocks {
1204 if quorum.add(block.author(), &self.context.committee) {
1205 return blocks;
1206 }
1207 }
1208 }
1209
1210 panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds.");
1211 }
1212
1213 #[cfg(test)]
1214 pub(crate) fn genesis_blocks(&self) -> Vec<VerifiedBlock> {
1215 self.genesis.values().cloned().collect()
1216 }
1217
1218 #[cfg(test)]
1219 pub(crate) fn set_last_commit(&mut self, commit: TrustedCommit) {
1220 self.last_commit = Some(commit);
1221 }
1222}
1223
1224struct BlockInfo {
1225 block: VerifiedBlock,
1226 committed: bool,
1228 included: bool,
1235}
1236
1237impl BlockInfo {
1238 fn new(block: VerifiedBlock) -> Self {
1239 Self {
1240 block,
1241 committed: false,
1242 included: false,
1243 }
1244 }
1245}
1246
1247#[cfg(test)]
1248mod test {
1249 use std::vec;
1250
1251 use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs};
1252 use parking_lot::RwLock;
1253
1254 use super::*;
1255 use crate::{
1256 block::{TestBlock, VerifiedBlock},
1257 storage::{WriteBatch, mem_store::MemStore},
1258 test_dag_builder::DagBuilder,
1259 test_dag_parser::parse_dag,
1260 };
1261
1262 #[tokio::test]
1263 async fn test_get_blocks() {
1264 let (context, _) = Context::new_for_test(4);
1265 let context = Arc::new(context);
1266 let store = Arc::new(MemStore::new());
1267 let mut dag_state = DagState::new(context.clone(), store.clone());
1268 let own_index = AuthorityIndex::new_for_test(0);
1269
1270 let num_rounds: u32 = 10;
1272 let non_existent_round: u32 = 100;
1273 let num_authorities: u32 = 3;
1274 let num_blocks_per_slot: usize = 3;
1275 let mut blocks = BTreeMap::new();
1276 for round in 1..=num_rounds {
1277 for author in 0..num_authorities {
1278 let base_ts = round as BlockTimestampMs * 1000;
1280 for timestamp in base_ts..base_ts + num_blocks_per_slot as u64 {
1281 let block = VerifiedBlock::new_for_test(
1282 TestBlock::new(round, author)
1283 .set_timestamp_ms(timestamp)
1284 .build(),
1285 );
1286 dag_state.accept_block(block.clone());
1287 blocks.insert(block.reference(), block);
1288
1289 if AuthorityIndex::new_for_test(author) == own_index {
1291 break;
1292 }
1293 }
1294 }
1295 }
1296
1297 for (r, block) in &blocks {
1299 assert_eq!(&dag_state.get_block(r).unwrap(), block);
1300 }
1301
1302 let last_ref = blocks.keys().last().unwrap();
1304 assert!(
1305 dag_state
1306 .get_block(&BlockRef::new(
1307 last_ref.round,
1308 last_ref.author,
1309 BlockDigest::MIN
1310 ))
1311 .is_none()
1312 );
1313
1314 for round in 1..=num_rounds {
1316 for author in 0..num_authorities {
1317 let slot = Slot::new(
1318 round,
1319 context
1320 .committee
1321 .to_authority_index(author as usize)
1322 .unwrap(),
1323 );
1324 let blocks = dag_state.get_uncommitted_blocks_at_slot(slot);
1325
1326 if AuthorityIndex::new_for_test(author) == own_index {
1328 assert_eq!(blocks.len(), 1);
1329 } else {
1330 assert_eq!(blocks.len(), num_blocks_per_slot);
1331 }
1332
1333 for b in blocks {
1334 assert_eq!(b.round(), round);
1335 assert_eq!(
1336 b.author(),
1337 context
1338 .committee
1339 .to_authority_index(author as usize)
1340 .unwrap()
1341 );
1342 }
1343 }
1344 }
1345
1346 let slot = Slot::new(non_existent_round, AuthorityIndex::ZERO);
1348 assert!(dag_state.get_uncommitted_blocks_at_slot(slot).is_empty());
1349
1350 for round in 1..=num_rounds {
1352 let blocks = dag_state.get_uncommitted_blocks_at_round(round);
1353 assert_eq!(
1356 blocks.len(),
1357 (num_authorities - 1) as usize * num_blocks_per_slot + 1
1358 );
1359 for b in blocks {
1360 assert_eq!(b.round(), round);
1361 }
1362 }
1363
1364 assert!(
1366 dag_state
1367 .get_uncommitted_blocks_at_round(non_existent_round)
1368 .is_empty()
1369 );
1370 }
1371
1372 #[tokio::test]
1373 async fn test_ancestors_at_uncommitted_round() {
1374 let (context, _) = Context::new_for_test(4);
1376 let context = Arc::new(context);
1377 let store = Arc::new(MemStore::new());
1378 let mut dag_state = DagState::new(context.clone(), store.clone());
1379
1380 let round_10_refs: Vec<_> = (0..4)
1384 .map(|a| {
1385 VerifiedBlock::new_for_test(TestBlock::new(10, a).set_timestamp_ms(1000).build())
1386 .reference()
1387 })
1388 .collect();
1389
1390 let round_11 = vec![
1392 VerifiedBlock::new_for_test(
1394 TestBlock::new(11, 0)
1395 .set_timestamp_ms(1100)
1396 .set_ancestors(round_10_refs.clone())
1397 .build(),
1398 ),
1399 VerifiedBlock::new_for_test(
1402 TestBlock::new(11, 1)
1403 .set_timestamp_ms(1110)
1404 .set_ancestors(round_10_refs.clone())
1405 .build(),
1406 ),
1407 VerifiedBlock::new_for_test(
1409 TestBlock::new(11, 1)
1410 .set_timestamp_ms(1111)
1411 .set_ancestors(round_10_refs.clone())
1412 .build(),
1413 ),
1414 VerifiedBlock::new_for_test(
1416 TestBlock::new(11, 1)
1417 .set_timestamp_ms(1112)
1418 .set_ancestors(round_10_refs.clone())
1419 .build(),
1420 ),
1421 VerifiedBlock::new_for_test(
1423 TestBlock::new(11, 2)
1424 .set_timestamp_ms(1120)
1425 .set_ancestors(round_10_refs.clone())
1426 .build(),
1427 ),
1428 VerifiedBlock::new_for_test(
1430 TestBlock::new(11, 3)
1431 .set_timestamp_ms(1130)
1432 .set_ancestors(round_10_refs.clone())
1433 .build(),
1434 ),
1435 ];
1436
1437 let ancestors_for_round_12 = vec![
1439 round_11[0].reference(),
1440 round_11[1].reference(),
1441 round_11[5].reference(),
1442 ];
1443 let round_12 = vec![
1444 VerifiedBlock::new_for_test(
1445 TestBlock::new(12, 0)
1446 .set_timestamp_ms(1200)
1447 .set_ancestors(ancestors_for_round_12.clone())
1448 .build(),
1449 ),
1450 VerifiedBlock::new_for_test(
1451 TestBlock::new(12, 2)
1452 .set_timestamp_ms(1220)
1453 .set_ancestors(ancestors_for_round_12.clone())
1454 .build(),
1455 ),
1456 VerifiedBlock::new_for_test(
1457 TestBlock::new(12, 3)
1458 .set_timestamp_ms(1230)
1459 .set_ancestors(ancestors_for_round_12.clone())
1460 .build(),
1461 ),
1462 ];
1463
1464 let ancestors_for_round_13 = vec![
1466 round_12[0].reference(),
1467 round_12[1].reference(),
1468 round_12[2].reference(),
1469 round_11[2].reference(),
1470 ];
1471 let round_13 = vec![
1472 VerifiedBlock::new_for_test(
1473 TestBlock::new(12, 1)
1474 .set_timestamp_ms(1300)
1475 .set_ancestors(ancestors_for_round_13.clone())
1476 .build(),
1477 ),
1478 VerifiedBlock::new_for_test(
1479 TestBlock::new(12, 2)
1480 .set_timestamp_ms(1320)
1481 .set_ancestors(ancestors_for_round_13.clone())
1482 .build(),
1483 ),
1484 VerifiedBlock::new_for_test(
1485 TestBlock::new(12, 3)
1486 .set_timestamp_ms(1330)
1487 .set_ancestors(ancestors_for_round_13.clone())
1488 .build(),
1489 ),
1490 ];
1491
1492 let ancestors_for_round_14 = round_13.iter().map(|b| b.reference()).collect();
1494 let anchor = VerifiedBlock::new_for_test(
1495 TestBlock::new(14, 1)
1496 .set_timestamp_ms(1410)
1497 .set_ancestors(ancestors_for_round_14)
1498 .build(),
1499 );
1500
1501 for b in round_11
1503 .iter()
1504 .chain(round_12.iter())
1505 .chain(round_13.iter())
1506 .chain([anchor.clone()].iter())
1507 {
1508 dag_state.accept_block(b.clone());
1509 }
1510
1511 let ancestors = dag_state.ancestors_at_round(&anchor, 11);
1513 let mut ancestors_refs: Vec<BlockRef> = ancestors.iter().map(|b| b.reference()).collect();
1514 ancestors_refs.sort();
1515 let mut expected_refs = vec![
1516 round_11[0].reference(),
1517 round_11[1].reference(),
1518 round_11[2].reference(),
1519 round_11[5].reference(),
1520 ];
1521 expected_refs.sort(); assert_eq!(
1523 ancestors_refs, expected_refs,
1524 "Expected round 11 ancestors: {:?}. Got: {:?}",
1525 expected_refs, ancestors_refs
1526 );
1527 }
1528
1529 #[tokio::test]
1530 async fn test_link_causal_history() {
1531 let (mut context, _) = Context::new_for_test(4);
1532 context.parameters.dag_state_cached_rounds = 10;
1533 context
1534 .protocol_config
1535 .set_consensus_gc_depth_for_testing(3);
1536 let context = Arc::new(context);
1537
1538 let store = Arc::new(MemStore::new());
1539 let mut dag_state = DagState::new(context.clone(), store.clone());
1540
1541 let mut dag_builder = DagBuilder::new(context.clone());
1543 dag_builder.layers(1..=3).build();
1544 dag_builder
1545 .layers(4..=6)
1546 .authorities(vec![AuthorityIndex::new_for_test(0)])
1547 .skip_block()
1548 .build();
1549
1550 let all_blocks = dag_builder.all_blocks();
1552 dag_state.accept_blocks(all_blocks.clone());
1553
1554 for block in &all_blocks {
1556 assert!(!dag_state.has_been_included(&block.reference()));
1557 }
1558
1559 let round_1_block = &all_blocks[1];
1561 assert_eq!(round_1_block.round(), 1);
1562 let linked_blocks = dag_state.link_causal_history(round_1_block.reference());
1563
1564 assert_eq!(linked_blocks.len(), 1);
1566 assert_eq!(linked_blocks[0], round_1_block.reference());
1567 for block_ref in linked_blocks {
1568 assert!(dag_state.has_been_included(&block_ref));
1569 }
1570
1571 let round_2_block = &all_blocks[4];
1573 assert_eq!(round_2_block.round(), 2);
1574 let linked_blocks = dag_state.link_causal_history(round_2_block.reference());
1575
1576 assert_eq!(linked_blocks.len(), 4);
1578 for block_ref in linked_blocks {
1579 assert!(block_ref == round_2_block.reference() || block_ref.round == 1);
1580 }
1581
1582 for block in &all_blocks {
1584 if block.round() == 1 || block.reference() == round_2_block.reference() {
1585 assert!(dag_state.has_been_included(&block.reference()));
1586 } else {
1587 assert!(!dag_state.has_been_included(&block.reference()));
1588 }
1589 }
1590
1591 let round_6_block = all_blocks.last().unwrap();
1593 assert_eq!(round_6_block.round(), 6);
1594
1595 let last_commit = TrustedCommit::new_for_test(
1597 6,
1598 CommitDigest::MIN,
1599 context.clock.timestamp_utc_ms(),
1600 round_6_block.reference(),
1601 vec![],
1602 );
1603 dag_state.set_last_commit(last_commit);
1604 assert_eq!(
1605 dag_state.gc_round(),
1606 3,
1607 "GC round should have moved to round 3"
1608 );
1609
1610 let linked_blocks = dag_state.link_causal_history(round_6_block.reference());
1612
1613 assert_eq!(linked_blocks.len(), 7, "Linked blocks: {:?}", linked_blocks);
1615 for block_ref in linked_blocks {
1616 assert!(
1617 block_ref.round == 4
1618 || block_ref.round == 5
1619 || block_ref == round_6_block.reference()
1620 );
1621 }
1622
1623 for block in &all_blocks {
1625 let block_ref = block.reference();
1626 if block.round() == 1
1627 || block_ref == round_2_block.reference()
1628 || block_ref.round == 4
1629 || block_ref.round == 5
1630 || block_ref == round_6_block.reference()
1631 {
1632 assert!(dag_state.has_been_included(&block.reference()));
1633 } else {
1634 assert!(!dag_state.has_been_included(&block.reference()));
1635 }
1636 }
1637 }
1638
1639 #[tokio::test]
1640 async fn test_contains_blocks_in_cache_or_store() {
1641 const CACHED_ROUNDS: Round = 2;
1643
1644 let (mut context, _) = Context::new_for_test(4);
1645 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1646
1647 let context = Arc::new(context);
1648 let store = Arc::new(MemStore::new());
1649 let mut dag_state = DagState::new(context.clone(), store.clone());
1650
1651 let num_rounds: u32 = 10;
1653 let num_authorities: u32 = 4;
1654 let mut blocks = Vec::new();
1655
1656 for round in 1..=num_rounds {
1657 for author in 0..num_authorities {
1658 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1659 blocks.push(block);
1660 }
1661 }
1662
1663 blocks.clone().into_iter().for_each(|block| {
1665 if block.round() <= 4 {
1666 store
1667 .write(WriteBatch::default().blocks(vec![block]))
1668 .unwrap();
1669 } else {
1670 dag_state.accept_blocks(vec![block]);
1671 }
1672 });
1673
1674 let mut block_refs = blocks
1677 .iter()
1678 .map(|block| block.reference())
1679 .collect::<Vec<_>>();
1680 let result = dag_state.contains_blocks(block_refs.clone());
1681
1682 let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1684 assert_eq!(result, expected);
1685
1686 block_refs.insert(
1688 3,
1689 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1690 );
1691 let result = dag_state.contains_blocks(block_refs.clone());
1692
1693 expected.insert(3, false);
1695 assert_eq!(result, expected.clone());
1696 }
1697
1698 #[tokio::test]
1699 async fn test_contains_cached_block_at_slot() {
1700 const CACHED_ROUNDS: Round = 2;
1702
1703 let num_authorities: u32 = 4;
1704 let (mut context, _) = Context::new_for_test(num_authorities as usize);
1705 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1706
1707 let context = Arc::new(context);
1708 let store = Arc::new(MemStore::new());
1709 let mut dag_state = DagState::new(context.clone(), store.clone());
1710
1711 let num_rounds: u32 = 10;
1713 let mut blocks = Vec::new();
1714
1715 for round in 1..=num_rounds {
1716 for author in 0..num_authorities {
1717 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1718 blocks.push(block.clone());
1719 dag_state.accept_block(block);
1720 }
1721 }
1722
1723 for (author, _) in context.committee.authorities() {
1725 assert!(
1726 dag_state.contains_cached_block_at_slot(Slot::new(GENESIS_ROUND, author)),
1727 "Genesis should always be found"
1728 );
1729 }
1730
1731 let mut block_refs = blocks
1734 .iter()
1735 .map(|block| block.reference())
1736 .collect::<Vec<_>>();
1737
1738 for block_ref in block_refs.clone() {
1739 let slot = block_ref.into();
1740 let found = dag_state.contains_cached_block_at_slot(slot);
1741 assert!(found, "A block should be found at slot {}", slot);
1742 }
1743
1744 block_refs.insert(
1747 3,
1748 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1749 );
1750 let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1751 expected.insert(3, false);
1752
1753 for block_ref in block_refs {
1755 let slot = block_ref.into();
1756 let found = dag_state.contains_cached_block_at_slot(slot);
1757
1758 assert_eq!(expected.remove(0), found);
1759 }
1760 }
1761
1762 #[tokio::test]
1763 #[ignore]
1764 #[should_panic(
1765 expected = "Attempted to check for slot [1]3 that is <= the last gc evicted round 3"
1766 )]
1767 async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range() {
1768 const GC_DEPTH: u32 = 2;
1771 const CACHED_ROUNDS: Round = 3;
1773
1774 let (mut context, _) = Context::new_for_test(4);
1775 context
1776 .protocol_config
1777 .set_consensus_gc_depth_for_testing(GC_DEPTH);
1778 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1779
1780 let context = Arc::new(context);
1781 let store = Arc::new(MemStore::new());
1782 let mut dag_state = DagState::new(context.clone(), store.clone());
1783
1784 let mut dag_builder = DagBuilder::new(context.clone());
1786 dag_builder.layers(1..=3).build();
1787 dag_builder
1788 .layers(4..=6)
1789 .authorities(vec![AuthorityIndex::new_for_test(0)])
1790 .skip_block()
1791 .build();
1792
1793 dag_builder
1795 .all_blocks()
1796 .into_iter()
1797 .for_each(|block| dag_state.accept_block(block));
1798
1799 dag_state.add_commit(TrustedCommit::new_for_test(
1801 1 as CommitIndex,
1802 CommitDigest::MIN,
1803 0,
1804 dag_builder.leader_block(5).unwrap().reference(),
1805 vec![],
1806 ));
1807 dag_state.flush();
1809
1810 assert_eq!(dag_state.gc_round(), 3, "GC round should be 3");
1812
1813 for authority_index in 1..=3 {
1817 for round in 4..=6 {
1818 assert!(dag_state.contains_cached_block_at_slot(Slot::new(
1819 round,
1820 AuthorityIndex::new_for_test(authority_index)
1821 )));
1822 }
1823 }
1824
1825 for round in 1..=3 {
1826 assert!(
1827 dag_state.contains_cached_block_at_slot(Slot::new(
1828 round,
1829 AuthorityIndex::new_for_test(0)
1830 ))
1831 );
1832 }
1833
1834 let _ =
1837 dag_state.contains_cached_block_at_slot(Slot::new(3, AuthorityIndex::new_for_test(1)));
1838 }
1839
1840 #[tokio::test]
1841 async fn test_get_blocks_in_cache_or_store() {
1842 let (context, _) = Context::new_for_test(4);
1843 let context = Arc::new(context);
1844 let store = Arc::new(MemStore::new());
1845 let mut dag_state = DagState::new(context.clone(), store.clone());
1846
1847 let num_rounds: u32 = 10;
1849 let num_authorities: u32 = 4;
1850 let mut blocks = Vec::new();
1851
1852 for round in 1..=num_rounds {
1853 for author in 0..num_authorities {
1854 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1855 blocks.push(block);
1856 }
1857 }
1858
1859 blocks.clone().into_iter().for_each(|block| {
1861 if block.round() <= 4 {
1862 store
1863 .write(WriteBatch::default().blocks(vec![block]))
1864 .unwrap();
1865 } else {
1866 dag_state.accept_blocks(vec![block]);
1867 }
1868 });
1869
1870 let mut block_refs = blocks
1873 .iter()
1874 .map(|block| block.reference())
1875 .collect::<Vec<_>>();
1876 let result = dag_state.get_blocks(&block_refs);
1877
1878 let mut expected = blocks
1879 .into_iter()
1880 .map(Some)
1881 .collect::<Vec<Option<VerifiedBlock>>>();
1882
1883 assert_eq!(result, expected.clone());
1885
1886 block_refs.insert(
1888 3,
1889 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1890 );
1891 let result = dag_state.get_blocks(&block_refs);
1892
1893 expected.insert(3, None);
1895 assert_eq!(result, expected);
1896 }
1897
1898 #[tokio::test]
1899 async fn test_flush_and_recovery() {
1900 telemetry_subscribers::init_for_testing();
1901
1902 const GC_DEPTH: u32 = 3;
1903 const CACHED_ROUNDS: u32 = 4;
1904
1905 let num_authorities: u32 = 4;
1906 let (mut context, _) = Context::new_for_test(num_authorities as usize);
1907 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1908 context
1909 .protocol_config
1910 .set_consensus_gc_depth_for_testing(GC_DEPTH);
1911
1912 let context = Arc::new(context);
1913
1914 let store = Arc::new(MemStore::new());
1915 let mut dag_state = DagState::new(context.clone(), store.clone());
1916
1917 const NUM_ROUNDS: Round = 20;
1918 let mut dag_builder = DagBuilder::new(context.clone());
1919 dag_builder.layers(1..=5).build();
1920 dag_builder
1921 .layers(6..=8)
1922 .authorities(vec![AuthorityIndex::new_for_test(0)])
1923 .skip_block()
1924 .build();
1925 dag_builder.layers(9..=NUM_ROUNDS).build();
1926
1927 const LAST_COMMIT_ROUND: Round = 16;
1929 const LAST_COMMIT_INDEX: CommitIndex = 15;
1930 let commits = dag_builder
1931 .get_sub_dag_and_commits(1..=NUM_ROUNDS)
1932 .into_iter()
1933 .map(|(_subdag, commit)| commit)
1934 .take(LAST_COMMIT_INDEX as usize)
1935 .collect::<Vec<_>>();
1936 assert_eq!(commits.len(), LAST_COMMIT_INDEX as usize);
1937 assert_eq!(commits.last().unwrap().round(), LAST_COMMIT_ROUND);
1938
1939 const PERSISTED_BLOCK_ROUNDS: u32 = 12;
1943 const NUM_PERSISTED_COMMITS: usize = 8;
1944 const LAST_PERSISTED_COMMIT_ROUND: Round = 9;
1945 const LAST_PERSISTED_COMMIT_INDEX: CommitIndex = 8;
1946 dag_state.accept_blocks(dag_builder.blocks(1..=PERSISTED_BLOCK_ROUNDS));
1947 let mut finalized_commits = vec![];
1948 for commit in commits.iter().take(NUM_PERSISTED_COMMITS).cloned() {
1949 finalized_commits.push(commit.clone());
1950 dag_state.add_commit(commit);
1951 }
1952 let last_finalized_commit = finalized_commits.last().unwrap();
1953 assert_eq!(last_finalized_commit.round(), LAST_PERSISTED_COMMIT_ROUND);
1954 assert_eq!(last_finalized_commit.index(), LAST_PERSISTED_COMMIT_INDEX);
1955
1956 let finalized_blocks = finalized_commits
1958 .iter()
1959 .flat_map(|commit| commit.blocks())
1960 .collect::<BTreeSet<_>>();
1961
1962 dag_state.flush();
1964
1965 let store_blocks = store
1967 .scan_blocks_by_author(AuthorityIndex::new_for_test(1), 1)
1968 .unwrap();
1969 assert_eq!(store_blocks.last().unwrap().round(), PERSISTED_BLOCK_ROUNDS);
1970 let store_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1971 assert_eq!(store_commits.len(), NUM_PERSISTED_COMMITS);
1972 assert_eq!(
1973 store_commits.last().unwrap().index(),
1974 LAST_PERSISTED_COMMIT_INDEX
1975 );
1976 assert_eq!(
1977 store_commits.last().unwrap().round(),
1978 LAST_PERSISTED_COMMIT_ROUND
1979 );
1980
1981 dag_state.accept_blocks(dag_builder.blocks(PERSISTED_BLOCK_ROUNDS + 1..=NUM_ROUNDS));
1983 for commit in commits.iter().skip(NUM_PERSISTED_COMMITS).cloned() {
1984 dag_state.add_commit(commit);
1985 }
1986
1987 let all_blocks = dag_builder.blocks(1..=NUM_ROUNDS);
1989 let block_refs = all_blocks
1990 .iter()
1991 .map(|block| block.reference())
1992 .collect::<Vec<_>>();
1993 let result = dag_state
1994 .get_blocks(&block_refs)
1995 .into_iter()
1996 .map(|b| b.unwrap())
1997 .collect::<Vec<_>>();
1998 assert_eq!(result, all_blocks);
1999
2000 assert_eq!(dag_state.last_commit_index(), LAST_COMMIT_INDEX);
2002
2003 drop(dag_state);
2005
2006 let dag_state = DagState::new(context.clone(), store.clone());
2008
2009 let all_blocks = dag_builder.blocks(1..=PERSISTED_BLOCK_ROUNDS);
2011 let block_refs = all_blocks
2012 .iter()
2013 .map(|block| block.reference())
2014 .collect::<Vec<_>>();
2015 let result = dag_state
2016 .get_blocks(&block_refs)
2017 .into_iter()
2018 .map(|b| b.unwrap())
2019 .collect::<Vec<_>>();
2020 assert_eq!(result, all_blocks);
2021
2022 let missing_blocks = dag_builder.blocks(PERSISTED_BLOCK_ROUNDS + 1..=NUM_ROUNDS);
2024 let block_refs = missing_blocks
2025 .iter()
2026 .map(|block| block.reference())
2027 .collect::<Vec<_>>();
2028 let retrieved_blocks = dag_state
2029 .get_blocks(&block_refs)
2030 .into_iter()
2031 .flatten()
2032 .collect::<Vec<_>>();
2033 assert!(retrieved_blocks.is_empty());
2034
2035 assert_eq!(dag_state.last_commit_index(), LAST_PERSISTED_COMMIT_INDEX);
2037 assert_eq!(dag_state.last_commit_round(), LAST_PERSISTED_COMMIT_ROUND);
2038
2039 let expected_last_committed_rounds = vec![5, 9, 8, 8];
2041 assert_eq!(
2042 dag_state.last_committed_rounds(),
2043 expected_last_committed_rounds
2044 );
2045 assert_eq!(dag_state.scoring_subdags_count(), NUM_PERSISTED_COMMITS);
2047
2048 for (authority_index, _) in context.committee.authorities() {
2050 let blocks = dag_state.get_cached_blocks(authority_index, 1);
2051
2052 if authority_index == AuthorityIndex::new_for_test(0) {
2056 assert_eq!(blocks.len(), 4);
2057 assert_eq!(dag_state.evicted_rounds[authority_index.value()], 6);
2058 assert!(
2059 blocks
2060 .into_iter()
2061 .all(|block| block.round() >= 7 && block.round() <= 12)
2062 );
2063 } else {
2064 assert_eq!(blocks.len(), 6);
2065 assert_eq!(dag_state.evicted_rounds[authority_index.value()], 6);
2066 assert!(
2067 blocks
2068 .into_iter()
2069 .all(|block| block.round() >= 7 && block.round() <= 12)
2070 );
2071 }
2072 }
2073
2074 let gc_round = dag_state.gc_round();
2076 assert_eq!(gc_round, 6);
2077 dag_state
2078 .recent_blocks
2079 .iter()
2080 .for_each(|(block_ref, block_info)| {
2081 if block_ref.round > gc_round && finalized_blocks.contains(block_ref) {
2082 assert!(
2083 block_info.committed,
2084 "Block {:?} should be set as committed",
2085 block_ref
2086 );
2087 }
2088 });
2089
2090 dag_state
2095 .recent_blocks
2096 .iter()
2097 .for_each(|(block_ref, block_info)| {
2098 if block_ref.round < PERSISTED_BLOCK_ROUNDS || block_ref.author.value() == 0 {
2099 assert!(block_info.included);
2100 } else {
2101 assert!(!block_info.included);
2102 }
2103 });
2104 }
2105
2106 #[tokio::test]
2107 async fn test_block_info_as_committed() {
2108 let num_authorities: u32 = 4;
2109 let (context, _) = Context::new_for_test(num_authorities as usize);
2110 let context = Arc::new(context);
2111
2112 let store = Arc::new(MemStore::new());
2113 let mut dag_state = DagState::new(context.clone(), store.clone());
2114
2115 let block = VerifiedBlock::new_for_test(
2117 TestBlock::new(1, 0)
2118 .set_timestamp_ms(1000)
2119 .set_ancestors(vec![])
2120 .build(),
2121 );
2122
2123 dag_state.accept_block(block.clone());
2124
2125 assert!(!dag_state.is_committed(&block.reference()));
2127
2128 assert!(
2130 dag_state.set_committed(&block.reference()),
2131 "Block should be successfully set as committed for first time"
2132 );
2133
2134 assert!(dag_state.is_committed(&block.reference()));
2136
2137 assert!(
2139 !dag_state.set_committed(&block.reference()),
2140 "Block should not be successfully set as committed"
2141 );
2142 }
2143
2144 #[tokio::test]
2145 async fn test_get_cached_blocks() {
2146 let (mut context, _) = Context::new_for_test(4);
2147 context.parameters.dag_state_cached_rounds = 5;
2148
2149 let context = Arc::new(context);
2150 let store = Arc::new(MemStore::new());
2151 let mut dag_state = DagState::new(context.clone(), store.clone());
2152
2153 let mut all_blocks = Vec::new();
2158 for author in 1..=3 {
2159 for round in 10..(10 + author) {
2160 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2161 all_blocks.push(block.clone());
2162 dag_state.accept_block(block);
2163 }
2164 }
2165
2166 let cached_blocks =
2169 dag_state.get_cached_blocks(context.committee.to_authority_index(0).unwrap(), 0);
2170 assert!(cached_blocks.is_empty());
2171
2172 let cached_blocks =
2173 dag_state.get_cached_blocks(context.committee.to_authority_index(1).unwrap(), 10);
2174 assert_eq!(cached_blocks.len(), 1);
2175 assert_eq!(cached_blocks[0].round(), 10);
2176
2177 let cached_blocks =
2178 dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 10);
2179 assert_eq!(cached_blocks.len(), 2);
2180 assert_eq!(cached_blocks[0].round(), 10);
2181 assert_eq!(cached_blocks[1].round(), 11);
2182
2183 let cached_blocks =
2184 dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 11);
2185 assert_eq!(cached_blocks.len(), 1);
2186 assert_eq!(cached_blocks[0].round(), 11);
2187
2188 let cached_blocks =
2189 dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 10);
2190 assert_eq!(cached_blocks.len(), 3);
2191 assert_eq!(cached_blocks[0].round(), 10);
2192 assert_eq!(cached_blocks[1].round(), 11);
2193 assert_eq!(cached_blocks[2].round(), 12);
2194
2195 let cached_blocks =
2196 dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 12);
2197 assert_eq!(cached_blocks.len(), 1);
2198 assert_eq!(cached_blocks[0].round(), 12);
2199
2200 let cached_blocks = dag_state.get_cached_blocks_in_range(
2204 context.committee.to_authority_index(3).unwrap(),
2205 10,
2206 10,
2207 1,
2208 );
2209 assert!(cached_blocks.is_empty());
2210
2211 let cached_blocks = dag_state.get_cached_blocks_in_range(
2213 context.committee.to_authority_index(3).unwrap(),
2214 11,
2215 10,
2216 1,
2217 );
2218 assert!(cached_blocks.is_empty());
2219
2220 let cached_blocks = dag_state.get_cached_blocks_in_range(
2222 context.committee.to_authority_index(0).unwrap(),
2223 9,
2224 10,
2225 1,
2226 );
2227 assert!(cached_blocks.is_empty());
2228
2229 let cached_blocks = dag_state.get_cached_blocks_in_range(
2231 context.committee.to_authority_index(1).unwrap(),
2232 9,
2233 11,
2234 1,
2235 );
2236 assert_eq!(cached_blocks.len(), 1);
2237 assert_eq!(cached_blocks[0].round(), 10);
2238
2239 let cached_blocks = dag_state.get_cached_blocks_in_range(
2241 context.committee.to_authority_index(2).unwrap(),
2242 9,
2243 12,
2244 5,
2245 );
2246 assert_eq!(cached_blocks.len(), 2);
2247 assert_eq!(cached_blocks[0].round(), 10);
2248 assert_eq!(cached_blocks[1].round(), 11);
2249
2250 let cached_blocks = dag_state.get_cached_blocks_in_range(
2252 context.committee.to_authority_index(3).unwrap(),
2253 11,
2254 20,
2255 5,
2256 );
2257 assert_eq!(cached_blocks.len(), 2);
2258 assert_eq!(cached_blocks[0].round(), 11);
2259 assert_eq!(cached_blocks[1].round(), 12);
2260
2261 let cached_blocks = dag_state.get_cached_blocks_in_range(
2263 context.committee.to_authority_index(3).unwrap(),
2264 10,
2265 20,
2266 1,
2267 );
2268 assert_eq!(cached_blocks.len(), 1);
2269 assert_eq!(cached_blocks[0].round(), 10);
2270 }
2271
2272 #[tokio::test]
2273 async fn test_get_last_cached_block() {
2274 const CACHED_ROUNDS: Round = 2;
2276 const GC_DEPTH: u32 = 1;
2277 let (mut context, _) = Context::new_for_test(4);
2278 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2279 context
2280 .protocol_config
2281 .set_consensus_gc_depth_for_testing(GC_DEPTH);
2282
2283 let context = Arc::new(context);
2284 let store = Arc::new(MemStore::new());
2285 let mut dag_state = DagState::new(context.clone(), store.clone());
2286
2287 let dag_str = "DAG {
2292 Round 0 : { 4 },
2293 Round 1 : {
2294 B -> [*],
2295 C -> [*],
2296 D -> [*],
2297 },
2298 Round 2 : {
2299 C -> [*],
2300 D -> [*],
2301 },
2302 Round 3 : {
2303 D -> [*],
2304 },
2305 }";
2306
2307 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2308
2309 let block = VerifiedBlock::new_for_test(TestBlock::new(2, 2).build());
2311
2312 for block in dag_builder
2314 .all_blocks()
2315 .into_iter()
2316 .chain(std::iter::once(block))
2317 {
2318 dag_state.accept_block(block);
2319 }
2320
2321 dag_state.add_commit(TrustedCommit::new_for_test(
2322 1 as CommitIndex,
2323 CommitDigest::MIN,
2324 context.clock.timestamp_utc_ms(),
2325 dag_builder.leader_block(3).unwrap().reference(),
2326 vec![],
2327 ));
2328
2329 let end_round = 4;
2331 let expected_rounds = vec![0, 1, 2, 3];
2332 let expected_excluded_and_equivocating_blocks = vec![0, 0, 1, 0];
2333 let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2335 assert_eq!(
2336 last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2337 expected_rounds
2338 );
2339 assert_eq!(
2340 last_blocks.iter().map(|b| b.1.len()).collect::<Vec<_>>(),
2341 expected_excluded_and_equivocating_blocks
2342 );
2343
2344 for (i, expected_round) in expected_rounds.iter().enumerate() {
2346 let round = dag_state
2347 .get_last_cached_block_in_range(
2348 context.committee.to_authority_index(i).unwrap(),
2349 0,
2350 end_round,
2351 )
2352 .map(|b| b.round())
2353 .unwrap_or_default();
2354 assert_eq!(round, *expected_round, "Authority {i}");
2355 }
2356
2357 let start_round = 2;
2359 let expected_rounds = [0, 0, 2, 3];
2360
2361 for (i, expected_round) in expected_rounds.iter().enumerate() {
2363 let round = dag_state
2364 .get_last_cached_block_in_range(
2365 context.committee.to_authority_index(i).unwrap(),
2366 start_round,
2367 end_round,
2368 )
2369 .map(|b| b.round())
2370 .unwrap_or_default();
2371 assert_eq!(round, *expected_round, "Authority {i}");
2372 }
2373
2374 dag_state.flush();
2380
2381 let end_round = 3;
2383 let expected_rounds = vec![0, 1, 2, 2];
2384
2385 let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2387 assert_eq!(
2388 last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2389 expected_rounds
2390 );
2391
2392 for (i, expected_round) in expected_rounds.iter().enumerate() {
2394 let round = dag_state
2395 .get_last_cached_block_in_range(
2396 context.committee.to_authority_index(i).unwrap(),
2397 0,
2398 end_round,
2399 )
2400 .map(|b| b.round())
2401 .unwrap_or_default();
2402 assert_eq!(round, *expected_round, "Authority {i}");
2403 }
2404 }
2405
2406 #[tokio::test]
2407 #[should_panic(
2408 expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
2409 )]
2410 async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() {
2411 const CACHED_ROUNDS: Round = 1;
2413 const GC_DEPTH: u32 = 1;
2414 let (mut context, _) = Context::new_for_test(4);
2415 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2416 context
2417 .protocol_config
2418 .set_consensus_gc_depth_for_testing(GC_DEPTH);
2419
2420 let context = Arc::new(context);
2421 let store = Arc::new(MemStore::new());
2422 let mut dag_state = DagState::new(context.clone(), store.clone());
2423
2424 let mut dag_builder = DagBuilder::new(context.clone());
2429 dag_builder
2430 .layers(1..=1)
2431 .authorities(vec![AuthorityIndex::new_for_test(0)])
2432 .skip_block()
2433 .build();
2434 dag_builder
2435 .layers(2..=2)
2436 .authorities(vec![
2437 AuthorityIndex::new_for_test(0),
2438 AuthorityIndex::new_for_test(1),
2439 ])
2440 .skip_block()
2441 .build();
2442 dag_builder
2443 .layers(3..=3)
2444 .authorities(vec![
2445 AuthorityIndex::new_for_test(0),
2446 AuthorityIndex::new_for_test(1),
2447 AuthorityIndex::new_for_test(2),
2448 ])
2449 .skip_block()
2450 .build();
2451
2452 for block in dag_builder.all_blocks() {
2454 dag_state.accept_block(block);
2455 }
2456
2457 dag_state.add_commit(TrustedCommit::new_for_test(
2458 1 as CommitIndex,
2459 CommitDigest::MIN,
2460 0,
2461 dag_builder.leader_block(3).unwrap().reference(),
2462 vec![],
2463 ));
2464
2465 dag_state.flush();
2467
2468 dag_state.get_last_cached_block_per_authority(2);
2470 }
2471
2472 #[tokio::test]
2473 async fn test_last_quorum() {
2474 let (context, _) = Context::new_for_test(4);
2476 let context = Arc::new(context);
2477 let store = Arc::new(MemStore::new());
2478 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2479
2480 {
2482 let genesis = genesis_blocks(context.as_ref());
2483
2484 assert_eq!(dag_state.read().last_quorum(), genesis);
2485 }
2486
2487 {
2489 let mut dag_builder = DagBuilder::new(context.clone());
2490 dag_builder
2491 .layers(1..=4)
2492 .build()
2493 .persist_layers(dag_state.clone());
2494 let round_4_blocks: Vec<_> = dag_builder
2495 .blocks(4..=4)
2496 .into_iter()
2497 .map(|block| block.reference())
2498 .collect();
2499
2500 let last_quorum = dag_state.read().last_quorum();
2501
2502 assert_eq!(
2503 last_quorum
2504 .into_iter()
2505 .map(|block| block.reference())
2506 .collect::<Vec<_>>(),
2507 round_4_blocks
2508 );
2509 }
2510
2511 {
2513 let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2514 dag_state.write().accept_block(block);
2515
2516 let round_4_blocks = dag_state.read().get_uncommitted_blocks_at_round(4);
2517
2518 let last_quorum = dag_state.read().last_quorum();
2519
2520 assert_eq!(last_quorum, round_4_blocks);
2521 }
2522 }
2523
2524 #[tokio::test]
2525 async fn test_last_block_for_authority() {
2526 let (context, _) = Context::new_for_test(4);
2528 let context = Arc::new(context);
2529 let store = Arc::new(MemStore::new());
2530 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2531
2532 {
2534 let genesis = genesis_blocks(context.as_ref());
2535 let my_genesis = genesis
2536 .into_iter()
2537 .find(|block| block.author() == context.own_index)
2538 .unwrap();
2539
2540 assert_eq!(dag_state.read().get_last_proposed_block(), my_genesis);
2541 }
2542
2543 {
2545 let mut dag_builder = DagBuilder::new(context.clone());
2547 dag_builder
2548 .layers(1..=4)
2549 .build()
2550 .persist_layers(dag_state.clone());
2551
2552 let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2554 dag_state.write().accept_block(block);
2555
2556 let block = dag_state
2557 .read()
2558 .get_last_block_for_authority(AuthorityIndex::new_for_test(0));
2559 assert_eq!(block.round(), 5);
2560
2561 for (authority_index, _) in context.committee.authorities() {
2562 let block = dag_state
2563 .read()
2564 .get_last_block_for_authority(authority_index);
2565
2566 if authority_index.value() == 0 {
2567 assert_eq!(block.round(), 5);
2568 } else {
2569 assert_eq!(block.round(), 4);
2570 }
2571 }
2572 }
2573 }
2574
2575 #[tokio::test]
2576 async fn test_accept_block_not_panics_when_timestamp_is_ahead_and_median_timestamp() {
2577 let (context, _) = Context::new_for_test(4);
2579 let context = Arc::new(context);
2580 let store = Arc::new(MemStore::new());
2581 let mut dag_state = DagState::new(context.clone(), store.clone());
2582
2583 let block_timestamp = context.clock.timestamp_utc_ms() + 5_000;
2585
2586 let block = VerifiedBlock::new_for_test(
2587 TestBlock::new(10, 0)
2588 .set_timestamp_ms(block_timestamp)
2589 .build(),
2590 );
2591
2592 dag_state.accept_block(block);
2594 }
2595
2596 #[tokio::test]
2597 async fn test_last_finalized_commit() {
2598 let (context, _) = Context::new_for_test(4);
2600 let context = Arc::new(context);
2601 let store = Arc::new(MemStore::new());
2602 let mut dag_state = DagState::new(context.clone(), store.clone());
2603
2604 let commit_ref = CommitRef::new(1, CommitDigest::MIN);
2606 let rejected_transactions = BTreeMap::new();
2607 dag_state.add_finalized_commit(commit_ref, rejected_transactions.clone());
2608
2609 assert_eq!(dag_state.finalized_commits_to_write.len(), 1);
2611 assert_eq!(
2612 dag_state.finalized_commits_to_write[0],
2613 (commit_ref, rejected_transactions.clone())
2614 );
2615
2616 dag_state.flush();
2618
2619 let last_finalized_commit = store.read_last_finalized_commit().unwrap();
2621 assert_eq!(last_finalized_commit, Some(commit_ref));
2622 let stored_rejected_transactions = store
2623 .read_rejected_transactions(commit_ref)
2624 .unwrap()
2625 .unwrap();
2626 assert_eq!(stored_rejected_transactions, rejected_transactions);
2627 }
2628}