1use std::{
5 cmp::max,
6 collections::{BTreeMap, BTreeSet, VecDeque},
7 ops::Bound::{Excluded, Included, Unbounded},
8 panic,
9 sync::Arc,
10 time::Duration,
11 vec,
12};
13
14use consensus_config::AuthorityIndex;
15use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs, Round, TransactionIndex};
16use itertools::Itertools as _;
17use tokio::time::Instant;
18use tracing::{debug, error, info, trace};
19
20use crate::{
21 CommittedSubDag,
22 block::{BlockAPI, GENESIS_ROUND, Slot, VerifiedBlock, genesis_blocks},
23 commit::{
24 CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRef, CommitVote,
25 GENESIS_COMMIT_INDEX, TrustedCommit, load_committed_subdag_from_store,
26 },
27 context::Context,
28 leader_scoring::{ReputationScores, ScoringSubdag},
29 storage::{Store, WriteBatch},
30 threshold_clock::ThresholdClock,
31};
32
33pub struct DagState {
41 context: Arc<Context>,
42
43 genesis: BTreeMap<BlockRef, VerifiedBlock>,
45
46 recent_blocks: BTreeMap<BlockRef, BlockInfo>,
54
55 recent_refs_by_authority: Vec<BTreeSet<BlockRef>>,
58
59 threshold_clock: ThresholdClock,
61
62 evicted_rounds: Vec<Round>,
66
67 highest_accepted_round: Round,
69
70 last_commit: Option<TrustedCommit>,
72
73 last_commit_round_advancement_time: Option<std::time::Instant>,
75
76 last_committed_rounds: Vec<Round>,
78
79 scoring_subdag: ScoringSubdag,
82
83 pending_commit_votes: VecDeque<CommitVote>,
87
88 blocks_to_write: Vec<VerifiedBlock>,
91 commits_to_write: Vec<TrustedCommit>,
92
93 commit_info_to_write: Vec<(CommitRef, CommitInfo)>,
97
98 finalized_commits_to_write: Vec<(CommitRef, BTreeMap<BlockRef, Vec<TransactionIndex>>)>,
100
101 store: Arc<dyn Store>,
103
104 cached_rounds: Round,
106}
107
108impl DagState {
109 pub fn new(context: Arc<Context>, store: Arc<dyn Store>) -> Self {
111 let cached_rounds = context.parameters.dag_state_cached_rounds as Round;
112 let num_authorities = context.committee.size();
113
114 let genesis = genesis_blocks(context.as_ref())
115 .into_iter()
116 .map(|block| (block.reference(), block))
117 .collect();
118
119 let threshold_clock = ThresholdClock::new(1, context.clone());
120
121 let last_commit = store
122 .read_last_commit()
123 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
124
125 let commit_info = store
126 .read_last_commit_info()
127 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
128 let (mut last_committed_rounds, commit_recovery_start_index) =
129 if let Some((commit_ref, commit_info)) = commit_info {
130 tracing::info!("Recovering committed state from {commit_ref} {commit_info:?}");
131 (commit_info.committed_rounds, commit_ref.index + 1)
132 } else {
133 tracing::info!("Found no stored CommitInfo to recover from");
134 (vec![0; num_authorities], GENESIS_COMMIT_INDEX + 1)
135 };
136
137 let mut unscored_committed_subdags = Vec::new();
138 let mut scoring_subdag = ScoringSubdag::new(context.clone());
139
140 if let Some(last_commit) = last_commit.as_ref() {
141 store
142 .scan_commits((commit_recovery_start_index..=last_commit.index()).into())
143 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
144 .iter()
145 .for_each(|commit| {
146 for block_ref in commit.blocks() {
147 last_committed_rounds[block_ref.author] =
148 max(last_committed_rounds[block_ref.author], block_ref.round);
149 }
150 let committed_subdag =
151 load_committed_subdag_from_store(store.as_ref(), commit.clone(), vec![]);
152 unscored_committed_subdags.push(committed_subdag);
154 });
155 }
156
157 tracing::info!(
158 "DagState was initialized with the following state: \
159 {last_commit:?}; {last_committed_rounds:?}; {} unscored committed subdags;",
160 unscored_committed_subdags.len()
161 );
162
163 scoring_subdag.add_subdags(std::mem::take(&mut unscored_committed_subdags));
164
165 let mut state = Self {
166 context: context.clone(),
167 genesis,
168 recent_blocks: BTreeMap::new(),
169 recent_refs_by_authority: vec![BTreeSet::new(); num_authorities],
170 threshold_clock,
171 highest_accepted_round: 0,
172 last_commit: last_commit.clone(),
173 last_commit_round_advancement_time: None,
174 last_committed_rounds: last_committed_rounds.clone(),
175 pending_commit_votes: VecDeque::new(),
176 blocks_to_write: vec![],
177 commits_to_write: vec![],
178 commit_info_to_write: vec![],
179 finalized_commits_to_write: vec![],
180 scoring_subdag,
181 store: store.clone(),
182 cached_rounds,
183 evicted_rounds: vec![0; num_authorities],
184 };
185
186 for (authority_index, _) in context.committee.authorities() {
187 let (blocks, eviction_round) = {
188 let last_block = state
191 .store
192 .scan_last_blocks_by_author(authority_index, 1, None)
193 .expect("Database error");
194 let last_block_round = last_block
195 .last()
196 .map(|b| b.round())
197 .unwrap_or(GENESIS_ROUND);
198
199 let eviction_round =
200 Self::eviction_round(last_block_round, state.gc_round(), state.cached_rounds);
201 let blocks = state
202 .store
203 .scan_blocks_by_author(authority_index, eviction_round + 1)
204 .expect("Database error");
205
206 (blocks, eviction_round)
207 };
208
209 state.evicted_rounds[authority_index] = eviction_round;
210
211 for block in &blocks {
213 state.update_block_metadata(block);
214 }
215
216 debug!(
217 "Recovered blocks {}: {:?}",
218 authority_index,
219 blocks
220 .iter()
221 .map(|b| b.reference())
222 .collect::<Vec<BlockRef>>()
223 );
224 }
225
226 if let Some(last_commit) = last_commit {
227 let mut index = last_commit.index();
228 let gc_round = state.gc_round();
229 info!(
230 "Recovering block commit statuses from commit index {} and backwards until leader of round <= gc_round {:?}",
231 index, gc_round
232 );
233
234 loop {
235 let commits = store
236 .scan_commits((index..=index).into())
237 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
238 let Some(commit) = commits.first() else {
239 info!("Recovering finished up to index {index}, no more commits to recover");
240 break;
241 };
242
243 if gc_round > 0 && commit.leader().round <= gc_round {
245 info!(
246 "Recovering finished, reached commit leader round {} <= gc_round {}",
247 commit.leader().round,
248 gc_round
249 );
250 break;
251 }
252
253 commit.blocks().iter().filter(|b| b.round > gc_round).for_each(|block_ref|{
254 debug!(
255 "Setting block {:?} as committed based on commit {:?}",
256 block_ref,
257 commit.index()
258 );
259 assert!(state.set_committed(block_ref), "Attempted to set again a block {:?} as committed when recovering commit {:?}", block_ref, commit);
260 });
261
262 index = index.saturating_sub(1);
264 if index == 0 {
265 break;
266 }
267 }
268 }
269
270 let proposed_blocks = store
272 .scan_blocks_by_author(context.own_index, state.gc_round() + 1)
273 .expect("Database error");
274 for block in proposed_blocks {
275 state.link_causal_history(block.reference());
276 }
277
278 state
279 }
280
281 pub(crate) fn accept_block(&mut self, block: VerifiedBlock) {
283 assert_ne!(
284 block.round(),
285 0,
286 "Genesis block should not be accepted into DAG."
287 );
288
289 let block_ref = block.reference();
290 if self.contains_block(&block_ref) {
291 return;
292 }
293
294 let now = self.context.clock.timestamp_utc_ms();
295 if block.timestamp_ms() > now {
296 trace!(
297 "Block {:?} with timestamp {} is greater than local timestamp {}.",
298 block,
299 block.timestamp_ms(),
300 now,
301 );
302 }
303 let hostname = &self.context.committee.authority(block_ref.author).hostname;
304 self.context
305 .metrics
306 .node_metrics
307 .accepted_block_time_drift_ms
308 .with_label_values(&[hostname])
309 .inc_by(block.timestamp_ms().saturating_sub(now));
310
311 if block_ref.author == self.context.own_index {
314 let existing_blocks = self.get_uncommitted_blocks_at_slot(block_ref.into());
315 assert!(
316 existing_blocks.is_empty(),
317 "Block Rejected! Attempted to add block {block:#?} to own slot where \
318 block(s) {existing_blocks:#?} already exists."
319 );
320 }
321 self.update_block_metadata(&block);
322 self.blocks_to_write.push(block);
323 let source = if self.context.own_index == block_ref.author {
324 "own"
325 } else {
326 "others"
327 };
328 self.context
329 .metrics
330 .node_metrics
331 .accepted_blocks
332 .with_label_values(&[source])
333 .inc();
334 }
335
336 fn update_block_metadata(&mut self, block: &VerifiedBlock) {
338 let block_ref = block.reference();
339 self.recent_blocks
340 .insert(block_ref, BlockInfo::new(block.clone()));
341 self.recent_refs_by_authority[block_ref.author].insert(block_ref);
342
343 if self.threshold_clock.add_block(block_ref) {
344 let last_proposed_block = self.get_last_proposed_block();
346 if last_proposed_block.round() == block_ref.round {
347 let quorum_delay_ms = self
348 .context
349 .clock
350 .timestamp_utc_ms()
351 .saturating_sub(self.get_last_proposed_block().timestamp_ms());
352 self.context
353 .metrics
354 .node_metrics
355 .quorum_receive_latency
356 .observe(Duration::from_millis(quorum_delay_ms).as_secs_f64());
357 }
358 }
359
360 self.highest_accepted_round = max(self.highest_accepted_round, block.round());
361 self.context
362 .metrics
363 .node_metrics
364 .highest_accepted_round
365 .set(self.highest_accepted_round as i64);
366
367 let highest_accepted_round_for_author = self.recent_refs_by_authority[block_ref.author]
368 .last()
369 .map(|block_ref| block_ref.round)
370 .expect("There should be by now at least one block ref");
371 let hostname = &self.context.committee.authority(block_ref.author).hostname;
372 self.context
373 .metrics
374 .node_metrics
375 .highest_accepted_authority_round
376 .with_label_values(&[hostname])
377 .set(highest_accepted_round_for_author as i64);
378 }
379
380 pub(crate) fn accept_blocks(&mut self, blocks: Vec<VerifiedBlock>) {
382 debug!(
383 "Accepting blocks: {}",
384 blocks.iter().map(|b| b.reference().to_string()).join(",")
385 );
386 for block in blocks {
387 self.accept_block(block);
388 }
389 }
390
391 pub(crate) fn get_block(&self, reference: &BlockRef) -> Option<VerifiedBlock> {
394 self.get_blocks(&[*reference])
395 .pop()
396 .expect("Exactly one element should be returned")
397 }
398
399 pub(crate) fn get_blocks(&self, block_refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
402 let mut blocks = vec![None; block_refs.len()];
403 let mut missing = Vec::new();
404
405 for (index, block_ref) in block_refs.iter().enumerate() {
406 if block_ref.round == GENESIS_ROUND {
407 if let Some(block) = self.genesis.get(block_ref) {
409 blocks[index] = Some(block.clone());
410 }
411 continue;
412 }
413 if let Some(block_info) = self.recent_blocks.get(block_ref) {
414 blocks[index] = Some(block_info.block.clone());
415 continue;
416 }
417 missing.push((index, block_ref));
418 }
419
420 if missing.is_empty() {
421 return blocks;
422 }
423
424 let missing_refs = missing
425 .iter()
426 .map(|(_, block_ref)| **block_ref)
427 .collect::<Vec<_>>();
428 let store_results = self
429 .store
430 .read_blocks(&missing_refs)
431 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
432 self.context
433 .metrics
434 .node_metrics
435 .dag_state_store_read_count
436 .with_label_values(&["get_blocks"])
437 .inc();
438
439 for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
440 blocks[index] = result;
441 }
442
443 blocks
444 }
445
446 pub(crate) fn get_uncommitted_blocks_at_slot(&self, slot: Slot) -> Vec<VerifiedBlock> {
449 let mut blocks = vec![];
453 for (_block_ref, block_info) in self.recent_blocks.range((
454 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
455 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
456 )) {
457 blocks.push(block_info.block.clone())
458 }
459 blocks
460 }
461
462 pub(crate) fn get_uncommitted_blocks_at_round(&self, round: Round) -> Vec<VerifiedBlock> {
465 if round <= self.last_commit_round() {
466 panic!("Round {} have committed blocks!", round);
467 }
468
469 let mut blocks = vec![];
470 for (_block_ref, block_info) in self.recent_blocks.range((
471 Included(BlockRef::new(round, AuthorityIndex::ZERO, BlockDigest::MIN)),
472 Excluded(BlockRef::new(
473 round + 1,
474 AuthorityIndex::ZERO,
475 BlockDigest::MIN,
476 )),
477 )) {
478 blocks.push(block_info.block.clone())
479 }
480 blocks
481 }
482
483 pub(crate) fn ancestors_at_round(
485 &self,
486 later_block: &VerifiedBlock,
487 earlier_round: Round,
488 ) -> Vec<VerifiedBlock> {
489 let mut linked: BTreeSet<BlockRef> = later_block.ancestors().iter().cloned().collect();
491 while !linked.is_empty() {
492 let round = linked.last().unwrap().round;
493 if round <= earlier_round {
495 break;
496 }
497 let block_ref = linked.pop_last().unwrap();
498 let Some(block) = self.get_block(&block_ref) else {
499 panic!("Block {:?} should exist in DAG!", block_ref);
500 };
501 linked.extend(block.ancestors().iter().cloned());
502 }
503 linked
504 .range((
505 Included(BlockRef::new(
506 earlier_round,
507 AuthorityIndex::ZERO,
508 BlockDigest::MIN,
509 )),
510 Unbounded,
511 ))
512 .map(|r| {
513 self.get_block(r)
514 .unwrap_or_else(|| panic!("Block {:?} should exist in DAG!", r))
515 .clone()
516 })
517 .collect()
518 }
519
520 pub(crate) fn get_last_proposed_block(&self) -> VerifiedBlock {
523 self.get_last_block_for_authority(self.context.own_index)
524 }
525
526 pub(crate) fn get_last_block_for_authority(&self, authority: AuthorityIndex) -> VerifiedBlock {
529 if let Some(last) = self.recent_refs_by_authority[authority].last() {
530 return self
531 .recent_blocks
532 .get(last)
533 .expect("Block should be found in recent blocks")
534 .block
535 .clone();
536 }
537
538 let (_, genesis_block) = self
540 .genesis
541 .iter()
542 .find(|(block_ref, _)| block_ref.author == authority)
543 .expect("Genesis should be found for authority {authority_index}");
544 genesis_block.clone()
545 }
546
547 pub(crate) fn get_cached_blocks(
553 &self,
554 authority: AuthorityIndex,
555 start: Round,
556 ) -> Vec<VerifiedBlock> {
557 self.get_cached_blocks_in_range(authority, start, Round::MAX, usize::MAX)
558 }
559
560 pub(crate) fn get_cached_blocks_in_range(
563 &self,
564 authority: AuthorityIndex,
565 start_round: Round,
566 end_round: Round,
567 limit: usize,
568 ) -> Vec<VerifiedBlock> {
569 if start_round >= end_round || limit == 0 {
570 return vec![];
571 }
572
573 let mut blocks = vec![];
574 for block_ref in self.recent_refs_by_authority[authority].range((
575 Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
576 Excluded(BlockRef::new(
577 end_round,
578 AuthorityIndex::MIN,
579 BlockDigest::MIN,
580 )),
581 )) {
582 let block_info = self
583 .recent_blocks
584 .get(block_ref)
585 .expect("Block should exist in recent blocks");
586 blocks.push(block_info.block.clone());
587 if blocks.len() >= limit {
588 break;
589 }
590 }
591 blocks
592 }
593
594 pub(crate) fn get_last_cached_block_in_range(
596 &self,
597 authority: AuthorityIndex,
598 start_round: Round,
599 end_round: Round,
600 ) -> Option<VerifiedBlock> {
601 if start_round >= end_round {
602 return None;
603 }
604
605 let block_ref = self.recent_refs_by_authority[authority]
606 .range((
607 Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
608 Excluded(BlockRef::new(
609 end_round,
610 AuthorityIndex::MIN,
611 BlockDigest::MIN,
612 )),
613 ))
614 .last()?;
615
616 self.recent_blocks
617 .get(block_ref)
618 .map(|block_info| block_info.block.clone())
619 }
620
621 pub(crate) fn get_last_cached_block_per_authority(
628 &self,
629 end_round: Round,
630 ) -> Vec<(VerifiedBlock, Vec<BlockRef>)> {
631 let mut blocks = self.genesis.values().cloned().collect::<Vec<_>>();
633 let mut equivocating_blocks = vec![vec![]; self.context.committee.size()];
634
635 if end_round == GENESIS_ROUND {
636 panic!(
637 "Attempted to retrieve blocks earlier than the genesis round which is not possible"
638 );
639 }
640
641 if end_round == GENESIS_ROUND + 1 {
642 return blocks.into_iter().map(|b| (b, vec![])).collect();
643 }
644
645 for (authority_index, block_refs) in self.recent_refs_by_authority.iter().enumerate() {
646 let authority_index = self
647 .context
648 .committee
649 .to_authority_index(authority_index)
650 .unwrap();
651
652 let last_evicted_round = self.evicted_rounds[authority_index];
653 if end_round.saturating_sub(1) <= last_evicted_round {
654 panic!(
655 "Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}",
656 );
657 }
658
659 let block_ref_iter = block_refs
660 .range((
661 Included(BlockRef::new(
662 last_evicted_round + 1,
663 authority_index,
664 BlockDigest::MIN,
665 )),
666 Excluded(BlockRef::new(end_round, authority_index, BlockDigest::MIN)),
667 ))
668 .rev();
669
670 let mut last_round = 0;
671 for block_ref in block_ref_iter {
672 if last_round == 0 {
673 last_round = block_ref.round;
674 let block_info = self
675 .recent_blocks
676 .get(block_ref)
677 .expect("Block should exist in recent blocks");
678 blocks[authority_index] = block_info.block.clone();
679 continue;
680 }
681 if block_ref.round < last_round {
682 break;
683 }
684 equivocating_blocks[authority_index].push(*block_ref);
685 }
686 }
687
688 blocks.into_iter().zip(equivocating_blocks).collect()
689 }
690
691 pub(crate) fn contains_cached_block_at_slot(&self, slot: Slot) -> bool {
694 if slot.round == GENESIS_ROUND {
696 return true;
697 }
698
699 let eviction_round = self.evicted_rounds[slot.authority];
700 if slot.round <= eviction_round {
701 panic!(
702 "{}",
703 format!(
704 "Attempted to check for slot {slot} that is <= the last evicted round {eviction_round}"
705 )
706 );
707 }
708
709 let mut result = self.recent_refs_by_authority[slot.authority].range((
710 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
711 Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
712 ));
713 result.next().is_some()
714 }
715
716 pub(crate) fn contains_blocks(&self, block_refs: Vec<BlockRef>) -> Vec<bool> {
719 let mut exist = vec![false; block_refs.len()];
720 let mut missing = Vec::new();
721
722 for (index, block_ref) in block_refs.into_iter().enumerate() {
723 let recent_refs = &self.recent_refs_by_authority[block_ref.author];
724 if recent_refs.contains(&block_ref) || self.genesis.contains_key(&block_ref) {
725 exist[index] = true;
726 } else if recent_refs.is_empty() || recent_refs.last().unwrap().round < block_ref.round
727 {
728 exist[index] = false;
732 } else {
733 missing.push((index, block_ref));
734 }
735 }
736
737 if missing.is_empty() {
738 return exist;
739 }
740
741 let missing_refs = missing
742 .iter()
743 .map(|(_, block_ref)| *block_ref)
744 .collect::<Vec<_>>();
745 let store_results = self
746 .store
747 .contains_blocks(&missing_refs)
748 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
749 self.context
750 .metrics
751 .node_metrics
752 .dag_state_store_read_count
753 .with_label_values(&["contains_blocks"])
754 .inc();
755
756 for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
757 exist[index] = result;
758 }
759
760 exist
761 }
762
763 pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool {
764 let blocks = self.contains_blocks(vec![*block_ref]);
765 blocks.first().cloned().unwrap()
766 }
767
768 pub(crate) fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
771 if let Some(block_info) = self.recent_blocks.get_mut(block_ref) {
772 if !block_info.committed {
773 block_info.committed = true;
774 return true;
775 }
776 false
777 } else {
778 panic!(
779 "Block {:?} not found in cache to set as committed.",
780 block_ref
781 );
782 }
783 }
784
785 pub(crate) fn is_committed(&self, block_ref: &BlockRef) -> bool {
787 self.recent_blocks
788 .get(block_ref)
789 .unwrap_or_else(|| panic!("Attempted to query for commit status for a block not in cached data {block_ref}"))
790 .committed
791 }
792
793 pub(crate) fn link_causal_history(&mut self, root_block: BlockRef) -> Vec<BlockRef> {
799 let gc_round = self.gc_round();
800 let mut linked_blocks = vec![];
801 let mut targets = VecDeque::new();
802 targets.push_back(root_block);
803 while let Some(block_ref) = targets.pop_front() {
804 if block_ref.round <= gc_round {
812 continue;
813 }
814 let block_info = self
815 .recent_blocks
816 .get_mut(&block_ref)
817 .unwrap_or_else(|| panic!("Block {:?} is not in DAG state", block_ref));
818 if block_info.included {
819 continue;
820 }
821 linked_blocks.push(block_ref);
822 block_info.included = true;
823 targets.extend(block_info.block.ancestors().iter());
824 }
825 linked_blocks
826 }
827
828 pub(crate) fn has_been_included(&self, block_ref: &BlockRef) -> bool {
831 self.recent_blocks
832 .get(block_ref)
833 .unwrap_or_else(|| {
834 panic!(
835 "Attempted to query for inclusion status for a block not in cached data {}",
836 block_ref
837 )
838 })
839 .included
840 }
841
842 pub(crate) fn threshold_clock_round(&self) -> Round {
843 self.threshold_clock.get_round()
844 }
845
846 pub(crate) fn threshold_clock_quorum_ts(&self) -> Instant {
848 self.threshold_clock.get_quorum_ts()
849 }
850
851 pub(crate) fn highest_accepted_round(&self) -> Round {
852 self.highest_accepted_round
853 }
854
855 pub(crate) fn add_commit(&mut self, commit: TrustedCommit) {
858 let time_diff = if let Some(last_commit) = &self.last_commit {
859 if commit.index() <= last_commit.index() {
860 error!(
861 "New commit index {} <= last commit index {}!",
862 commit.index(),
863 last_commit.index()
864 );
865 return;
866 }
867 assert_eq!(commit.index(), last_commit.index() + 1);
868
869 if commit.timestamp_ms() < last_commit.timestamp_ms() {
870 panic!(
871 "Commit timestamps do not monotonically increment, prev commit {:?}, new commit {:?}",
872 last_commit, commit
873 );
874 }
875 commit
876 .timestamp_ms()
877 .saturating_sub(last_commit.timestamp_ms())
878 } else {
879 assert_eq!(commit.index(), 1);
880 0
881 };
882
883 self.context
884 .metrics
885 .node_metrics
886 .last_commit_time_diff
887 .observe(time_diff as f64);
888
889 let commit_round_advanced = if let Some(previous_commit) = &self.last_commit {
890 previous_commit.round() < commit.round()
891 } else {
892 true
893 };
894
895 self.last_commit = Some(commit.clone());
896
897 if commit_round_advanced {
898 let now = std::time::Instant::now();
899 if let Some(previous_time) = self.last_commit_round_advancement_time {
900 self.context
901 .metrics
902 .node_metrics
903 .commit_round_advancement_interval
904 .observe(now.duration_since(previous_time).as_secs_f64())
905 }
906 self.last_commit_round_advancement_time = Some(now);
907 }
908
909 for block_ref in commit.blocks().iter() {
910 self.last_committed_rounds[block_ref.author] = max(
911 self.last_committed_rounds[block_ref.author],
912 block_ref.round,
913 );
914 }
915
916 for (i, round) in self.last_committed_rounds.iter().enumerate() {
917 let index = self.context.committee.to_authority_index(i).unwrap();
918 let hostname = &self.context.committee.authority(index).hostname;
919 self.context
920 .metrics
921 .node_metrics
922 .last_committed_authority_round
923 .with_label_values(&[hostname])
924 .set((*round).into());
925 }
926
927 self.pending_commit_votes.push_back(commit.reference());
928 self.commits_to_write.push(commit);
929 }
930
931 pub(crate) fn recover_commits_to_write(&mut self, commits: Vec<TrustedCommit>) {
933 self.commits_to_write.extend(commits);
934 }
935
936 pub(crate) fn ensure_commits_to_write_is_empty(&self) {
937 assert!(
938 self.commits_to_write.is_empty(),
939 "Commits to write should be empty. {:?}",
940 self.commits_to_write,
941 );
942 }
943
944 pub(crate) fn add_commit_info(&mut self, reputation_scores: ReputationScores) {
945 assert!(self.scoring_subdag.is_empty());
949
950 let commit_info = CommitInfo {
951 committed_rounds: self.last_committed_rounds.clone(),
952 reputation_scores,
953 };
954 let last_commit = self
955 .last_commit
956 .as_ref()
957 .expect("Last commit should already be set.");
958 self.commit_info_to_write
959 .push((last_commit.reference(), commit_info));
960 }
961
962 pub(crate) fn add_finalized_commit(
963 &mut self,
964 commit_ref: CommitRef,
965 rejected_transactions: BTreeMap<BlockRef, Vec<TransactionIndex>>,
966 ) {
967 self.finalized_commits_to_write
968 .push((commit_ref, rejected_transactions));
969 }
970
971 pub(crate) fn take_commit_votes(&mut self, limit: usize) -> Vec<CommitVote> {
972 let mut votes = Vec::new();
973 while !self.pending_commit_votes.is_empty() && votes.len() < limit {
974 votes.push(self.pending_commit_votes.pop_front().unwrap());
975 }
976 votes
977 }
978
979 pub(crate) fn last_commit_index(&self) -> CommitIndex {
981 match &self.last_commit {
982 Some(commit) => commit.index(),
983 None => 0,
984 }
985 }
986
987 pub(crate) fn last_commit_digest(&self) -> CommitDigest {
989 match &self.last_commit {
990 Some(commit) => commit.digest(),
991 None => CommitDigest::MIN,
992 }
993 }
994
995 pub(crate) fn last_commit_timestamp_ms(&self) -> BlockTimestampMs {
997 match &self.last_commit {
998 Some(commit) => commit.timestamp_ms(),
999 None => 0,
1000 }
1001 }
1002
1003 pub(crate) fn last_commit_leader(&self) -> Slot {
1005 match &self.last_commit {
1006 Some(commit) => commit.leader().into(),
1007 None => self
1008 .genesis
1009 .iter()
1010 .next()
1011 .map(|(genesis_ref, _)| *genesis_ref)
1012 .expect("Genesis blocks should always be available.")
1013 .into(),
1014 }
1015 }
1016
1017 pub(crate) fn last_commit_round(&self) -> Round {
1019 match &self.last_commit {
1020 Some(commit) => commit.leader().round,
1021 None => 0,
1022 }
1023 }
1024
1025 pub(crate) fn last_committed_rounds(&self) -> Vec<Round> {
1027 self.last_committed_rounds.clone()
1028 }
1029
1030 pub(crate) fn gc_round(&self) -> Round {
1034 self.calculate_gc_round(self.last_commit_round())
1035 }
1036
1037 pub(crate) fn calculate_gc_round(&self, commit_round: Round) -> Round {
1040 commit_round.saturating_sub(self.context.protocol_config.gc_depth())
1041 }
1042
1043 pub(crate) fn flush(&mut self) {
1053 let _s = self
1054 .context
1055 .metrics
1056 .node_metrics
1057 .scope_processing_time
1058 .with_label_values(&["DagState::flush"])
1059 .start_timer();
1060
1061 let pending_blocks = std::mem::take(&mut self.blocks_to_write);
1063 let pending_commits = std::mem::take(&mut self.commits_to_write);
1064 let pending_commit_info = std::mem::take(&mut self.commit_info_to_write);
1065 let pending_finalized_commits = std::mem::take(&mut self.finalized_commits_to_write);
1066 if pending_blocks.is_empty()
1067 && pending_commits.is_empty()
1068 && pending_commit_info.is_empty()
1069 && pending_finalized_commits.is_empty()
1070 {
1071 return;
1072 }
1073
1074 debug!(
1075 "Flushing {} blocks ({}), {} commits ({}), {} commit infos ({}), {} finalized commits ({}) to storage.",
1076 pending_blocks.len(),
1077 pending_blocks
1078 .iter()
1079 .map(|b| b.reference().to_string())
1080 .join(","),
1081 pending_commits.len(),
1082 pending_commits
1083 .iter()
1084 .map(|c| c.reference().to_string())
1085 .join(","),
1086 pending_commit_info.len(),
1087 pending_commit_info
1088 .iter()
1089 .map(|(commit_ref, _)| commit_ref.to_string())
1090 .join(","),
1091 pending_finalized_commits.len(),
1092 pending_finalized_commits
1093 .iter()
1094 .map(|(commit_ref, _)| commit_ref.to_string())
1095 .join(","),
1096 );
1097 self.store
1098 .write(WriteBatch::new(
1099 pending_blocks,
1100 pending_commits,
1101 pending_commit_info,
1102 pending_finalized_commits,
1103 ))
1104 .unwrap_or_else(|e| panic!("Failed to write to storage: {:?}", e));
1105 self.context
1106 .metrics
1107 .node_metrics
1108 .dag_state_store_write_count
1109 .inc();
1110
1111 for (authority_index, _) in self.context.committee.authorities() {
1113 let eviction_round = self.calculate_authority_eviction_round(authority_index);
1114 while let Some(block_ref) = self.recent_refs_by_authority[authority_index].first() {
1115 if block_ref.round <= eviction_round {
1116 self.recent_blocks.remove(block_ref);
1117 self.recent_refs_by_authority[authority_index].pop_first();
1118 } else {
1119 break;
1120 }
1121 }
1122 self.evicted_rounds[authority_index] = eviction_round;
1123 }
1124
1125 let metrics = &self.context.metrics.node_metrics;
1126 metrics
1127 .dag_state_recent_blocks
1128 .set(self.recent_blocks.len() as i64);
1129 metrics.dag_state_recent_refs.set(
1130 self.recent_refs_by_authority
1131 .iter()
1132 .map(BTreeSet::len)
1133 .sum::<usize>() as i64,
1134 );
1135 }
1136
1137 pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> {
1138 self.store
1139 .read_last_commit_info()
1140 .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
1141 }
1142
1143 pub(crate) fn add_scoring_subdags(&mut self, scoring_subdags: Vec<CommittedSubDag>) {
1144 self.scoring_subdag.add_subdags(scoring_subdags);
1145 }
1146
1147 pub(crate) fn clear_scoring_subdag(&mut self) {
1148 self.scoring_subdag.clear();
1149 }
1150
1151 pub(crate) fn scoring_subdags_count(&self) -> usize {
1152 self.scoring_subdag.scored_subdags_count()
1153 }
1154
1155 pub(crate) fn is_scoring_subdag_empty(&self) -> bool {
1156 self.scoring_subdag.is_empty()
1157 }
1158
1159 pub(crate) fn calculate_scoring_subdag_scores(&self) -> ReputationScores {
1160 self.scoring_subdag.calculate_distributed_vote_scores()
1161 }
1162
1163 pub(crate) fn scoring_subdag_commit_range(&self) -> CommitIndex {
1164 self.scoring_subdag
1165 .commit_range
1166 .as_ref()
1167 .expect("commit range should exist for scoring subdag")
1168 .end()
1169 }
1170
1171 fn calculate_authority_eviction_round(&self, authority_index: AuthorityIndex) -> Round {
1175 let last_round = self.recent_refs_by_authority[authority_index]
1176 .last()
1177 .map(|block_ref| block_ref.round)
1178 .unwrap_or(GENESIS_ROUND);
1179
1180 Self::eviction_round(last_round, self.gc_round(), self.cached_rounds)
1181 }
1182
1183 fn eviction_round(last_round: Round, gc_round: Round, cached_rounds: u32) -> Round {
1186 gc_round.min(last_round.saturating_sub(cached_rounds))
1187 }
1188
1189 pub(crate) fn store(&self) -> Arc<dyn Store> {
1191 self.store.clone()
1192 }
1193
1194 #[cfg(test)]
1197 pub(crate) fn last_quorum(&self) -> Vec<VerifiedBlock> {
1198 for round in
1201 (self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev()
1202 {
1203 if round == GENESIS_ROUND {
1204 return self.genesis_blocks();
1205 }
1206 use crate::stake_aggregator::{QuorumThreshold, StakeAggregator};
1207 let mut quorum = StakeAggregator::<QuorumThreshold>::new();
1208
1209 let blocks = self.get_uncommitted_blocks_at_round(round);
1211 for block in &blocks {
1212 if quorum.add(block.author(), &self.context.committee) {
1213 return blocks;
1214 }
1215 }
1216 }
1217
1218 panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds.");
1219 }
1220
1221 #[cfg(test)]
1222 pub(crate) fn genesis_blocks(&self) -> Vec<VerifiedBlock> {
1223 self.genesis.values().cloned().collect()
1224 }
1225
1226 #[cfg(test)]
1227 pub(crate) fn set_last_commit(&mut self, commit: TrustedCommit) {
1228 self.last_commit = Some(commit);
1229 }
1230}
1231
1232struct BlockInfo {
1233 block: VerifiedBlock,
1234 committed: bool,
1236 included: bool,
1243}
1244
1245impl BlockInfo {
1246 fn new(block: VerifiedBlock) -> Self {
1247 Self {
1248 block,
1249 committed: false,
1250 included: false,
1251 }
1252 }
1253}
1254
1255#[cfg(test)]
1256mod test {
1257 use std::vec;
1258
1259 use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs};
1260 use parking_lot::RwLock;
1261
1262 use super::*;
1263 use crate::{
1264 block::{TestBlock, VerifiedBlock},
1265 storage::{WriteBatch, mem_store::MemStore},
1266 test_dag_builder::DagBuilder,
1267 test_dag_parser::parse_dag,
1268 };
1269
1270 #[tokio::test]
1271 async fn test_get_blocks() {
1272 let (context, _) = Context::new_for_test(4);
1273 let context = Arc::new(context);
1274 let store = Arc::new(MemStore::new());
1275 let mut dag_state = DagState::new(context.clone(), store.clone());
1276 let own_index = AuthorityIndex::new_for_test(0);
1277
1278 let num_rounds: u32 = 10;
1280 let non_existent_round: u32 = 100;
1281 let num_authorities: u32 = 3;
1282 let num_blocks_per_slot: usize = 3;
1283 let mut blocks = BTreeMap::new();
1284 for round in 1..=num_rounds {
1285 for author in 0..num_authorities {
1286 let base_ts = round as BlockTimestampMs * 1000;
1288 for timestamp in base_ts..base_ts + num_blocks_per_slot as u64 {
1289 let block = VerifiedBlock::new_for_test(
1290 TestBlock::new(round, author)
1291 .set_timestamp_ms(timestamp)
1292 .build(),
1293 );
1294 dag_state.accept_block(block.clone());
1295 blocks.insert(block.reference(), block);
1296
1297 if AuthorityIndex::new_for_test(author) == own_index {
1299 break;
1300 }
1301 }
1302 }
1303 }
1304
1305 for (r, block) in &blocks {
1307 assert_eq!(&dag_state.get_block(r).unwrap(), block);
1308 }
1309
1310 let last_ref = blocks.keys().last().unwrap();
1312 assert!(
1313 dag_state
1314 .get_block(&BlockRef::new(
1315 last_ref.round,
1316 last_ref.author,
1317 BlockDigest::MIN
1318 ))
1319 .is_none()
1320 );
1321
1322 for round in 1..=num_rounds {
1324 for author in 0..num_authorities {
1325 let slot = Slot::new(
1326 round,
1327 context
1328 .committee
1329 .to_authority_index(author as usize)
1330 .unwrap(),
1331 );
1332 let blocks = dag_state.get_uncommitted_blocks_at_slot(slot);
1333
1334 if AuthorityIndex::new_for_test(author) == own_index {
1336 assert_eq!(blocks.len(), 1);
1337 } else {
1338 assert_eq!(blocks.len(), num_blocks_per_slot);
1339 }
1340
1341 for b in blocks {
1342 assert_eq!(b.round(), round);
1343 assert_eq!(
1344 b.author(),
1345 context
1346 .committee
1347 .to_authority_index(author as usize)
1348 .unwrap()
1349 );
1350 }
1351 }
1352 }
1353
1354 let slot = Slot::new(non_existent_round, AuthorityIndex::ZERO);
1356 assert!(dag_state.get_uncommitted_blocks_at_slot(slot).is_empty());
1357
1358 for round in 1..=num_rounds {
1360 let blocks = dag_state.get_uncommitted_blocks_at_round(round);
1361 assert_eq!(
1364 blocks.len(),
1365 (num_authorities - 1) as usize * num_blocks_per_slot + 1
1366 );
1367 for b in blocks {
1368 assert_eq!(b.round(), round);
1369 }
1370 }
1371
1372 assert!(
1374 dag_state
1375 .get_uncommitted_blocks_at_round(non_existent_round)
1376 .is_empty()
1377 );
1378 }
1379
1380 #[tokio::test]
1381 async fn test_ancestors_at_uncommitted_round() {
1382 let (context, _) = Context::new_for_test(4);
1384 let context = Arc::new(context);
1385 let store = Arc::new(MemStore::new());
1386 let mut dag_state = DagState::new(context.clone(), store.clone());
1387
1388 let round_10_refs: Vec<_> = (0..4)
1392 .map(|a| {
1393 VerifiedBlock::new_for_test(TestBlock::new(10, a).set_timestamp_ms(1000).build())
1394 .reference()
1395 })
1396 .collect();
1397
1398 let round_11 = vec![
1400 VerifiedBlock::new_for_test(
1402 TestBlock::new(11, 0)
1403 .set_timestamp_ms(1100)
1404 .set_ancestors(round_10_refs.clone())
1405 .build(),
1406 ),
1407 VerifiedBlock::new_for_test(
1410 TestBlock::new(11, 1)
1411 .set_timestamp_ms(1110)
1412 .set_ancestors(round_10_refs.clone())
1413 .build(),
1414 ),
1415 VerifiedBlock::new_for_test(
1417 TestBlock::new(11, 1)
1418 .set_timestamp_ms(1111)
1419 .set_ancestors(round_10_refs.clone())
1420 .build(),
1421 ),
1422 VerifiedBlock::new_for_test(
1424 TestBlock::new(11, 1)
1425 .set_timestamp_ms(1112)
1426 .set_ancestors(round_10_refs.clone())
1427 .build(),
1428 ),
1429 VerifiedBlock::new_for_test(
1431 TestBlock::new(11, 2)
1432 .set_timestamp_ms(1120)
1433 .set_ancestors(round_10_refs.clone())
1434 .build(),
1435 ),
1436 VerifiedBlock::new_for_test(
1438 TestBlock::new(11, 3)
1439 .set_timestamp_ms(1130)
1440 .set_ancestors(round_10_refs.clone())
1441 .build(),
1442 ),
1443 ];
1444
1445 let ancestors_for_round_12 = vec![
1447 round_11[0].reference(),
1448 round_11[1].reference(),
1449 round_11[5].reference(),
1450 ];
1451 let round_12 = vec![
1452 VerifiedBlock::new_for_test(
1453 TestBlock::new(12, 0)
1454 .set_timestamp_ms(1200)
1455 .set_ancestors(ancestors_for_round_12.clone())
1456 .build(),
1457 ),
1458 VerifiedBlock::new_for_test(
1459 TestBlock::new(12, 2)
1460 .set_timestamp_ms(1220)
1461 .set_ancestors(ancestors_for_round_12.clone())
1462 .build(),
1463 ),
1464 VerifiedBlock::new_for_test(
1465 TestBlock::new(12, 3)
1466 .set_timestamp_ms(1230)
1467 .set_ancestors(ancestors_for_round_12.clone())
1468 .build(),
1469 ),
1470 ];
1471
1472 let ancestors_for_round_13 = vec![
1474 round_12[0].reference(),
1475 round_12[1].reference(),
1476 round_12[2].reference(),
1477 round_11[2].reference(),
1478 ];
1479 let round_13 = vec![
1480 VerifiedBlock::new_for_test(
1481 TestBlock::new(12, 1)
1482 .set_timestamp_ms(1300)
1483 .set_ancestors(ancestors_for_round_13.clone())
1484 .build(),
1485 ),
1486 VerifiedBlock::new_for_test(
1487 TestBlock::new(12, 2)
1488 .set_timestamp_ms(1320)
1489 .set_ancestors(ancestors_for_round_13.clone())
1490 .build(),
1491 ),
1492 VerifiedBlock::new_for_test(
1493 TestBlock::new(12, 3)
1494 .set_timestamp_ms(1330)
1495 .set_ancestors(ancestors_for_round_13.clone())
1496 .build(),
1497 ),
1498 ];
1499
1500 let ancestors_for_round_14 = round_13.iter().map(|b| b.reference()).collect();
1502 let anchor = VerifiedBlock::new_for_test(
1503 TestBlock::new(14, 1)
1504 .set_timestamp_ms(1410)
1505 .set_ancestors(ancestors_for_round_14)
1506 .build(),
1507 );
1508
1509 for b in round_11
1511 .iter()
1512 .chain(round_12.iter())
1513 .chain(round_13.iter())
1514 .chain([anchor.clone()].iter())
1515 {
1516 dag_state.accept_block(b.clone());
1517 }
1518
1519 let ancestors = dag_state.ancestors_at_round(&anchor, 11);
1521 let mut ancestors_refs: Vec<BlockRef> = ancestors.iter().map(|b| b.reference()).collect();
1522 ancestors_refs.sort();
1523 let mut expected_refs = vec![
1524 round_11[0].reference(),
1525 round_11[1].reference(),
1526 round_11[2].reference(),
1527 round_11[5].reference(),
1528 ];
1529 expected_refs.sort(); assert_eq!(
1531 ancestors_refs, expected_refs,
1532 "Expected round 11 ancestors: {:?}. Got: {:?}",
1533 expected_refs, ancestors_refs
1534 );
1535 }
1536
1537 #[tokio::test]
1538 async fn test_link_causal_history() {
1539 let (mut context, _) = Context::new_for_test(4);
1540 context.parameters.dag_state_cached_rounds = 10;
1541 context
1542 .protocol_config
1543 .set_consensus_gc_depth_for_testing(3);
1544 let context = Arc::new(context);
1545
1546 let store = Arc::new(MemStore::new());
1547 let mut dag_state = DagState::new(context.clone(), store.clone());
1548
1549 let mut dag_builder = DagBuilder::new(context.clone());
1551 dag_builder.layers(1..=3).build();
1552 dag_builder
1553 .layers(4..=6)
1554 .authorities(vec![AuthorityIndex::new_for_test(0)])
1555 .skip_block()
1556 .build();
1557
1558 let all_blocks = dag_builder.all_blocks();
1560 dag_state.accept_blocks(all_blocks.clone());
1561
1562 for block in &all_blocks {
1564 assert!(!dag_state.has_been_included(&block.reference()));
1565 }
1566
1567 let round_1_block = &all_blocks[1];
1569 assert_eq!(round_1_block.round(), 1);
1570 let linked_blocks = dag_state.link_causal_history(round_1_block.reference());
1571
1572 assert_eq!(linked_blocks.len(), 1);
1574 assert_eq!(linked_blocks[0], round_1_block.reference());
1575 for block_ref in linked_blocks {
1576 assert!(dag_state.has_been_included(&block_ref));
1577 }
1578
1579 let round_2_block = &all_blocks[4];
1581 assert_eq!(round_2_block.round(), 2);
1582 let linked_blocks = dag_state.link_causal_history(round_2_block.reference());
1583
1584 assert_eq!(linked_blocks.len(), 4);
1586 for block_ref in linked_blocks {
1587 assert!(block_ref == round_2_block.reference() || block_ref.round == 1);
1588 }
1589
1590 for block in &all_blocks {
1592 if block.round() == 1 || block.reference() == round_2_block.reference() {
1593 assert!(dag_state.has_been_included(&block.reference()));
1594 } else {
1595 assert!(!dag_state.has_been_included(&block.reference()));
1596 }
1597 }
1598
1599 let round_6_block = all_blocks.last().unwrap();
1601 assert_eq!(round_6_block.round(), 6);
1602
1603 let last_commit = TrustedCommit::new_for_test(
1605 6,
1606 CommitDigest::MIN,
1607 context.clock.timestamp_utc_ms(),
1608 round_6_block.reference(),
1609 vec![],
1610 );
1611 dag_state.set_last_commit(last_commit);
1612 assert_eq!(
1613 dag_state.gc_round(),
1614 3,
1615 "GC round should have moved to round 3"
1616 );
1617
1618 let linked_blocks = dag_state.link_causal_history(round_6_block.reference());
1620
1621 assert_eq!(linked_blocks.len(), 7, "Linked blocks: {:?}", linked_blocks);
1623 for block_ref in linked_blocks {
1624 assert!(
1625 block_ref.round == 4
1626 || block_ref.round == 5
1627 || block_ref == round_6_block.reference()
1628 );
1629 }
1630
1631 for block in &all_blocks {
1633 let block_ref = block.reference();
1634 if block.round() == 1
1635 || block_ref == round_2_block.reference()
1636 || block_ref.round == 4
1637 || block_ref.round == 5
1638 || block_ref == round_6_block.reference()
1639 {
1640 assert!(dag_state.has_been_included(&block.reference()));
1641 } else {
1642 assert!(!dag_state.has_been_included(&block.reference()));
1643 }
1644 }
1645 }
1646
1647 #[tokio::test]
1648 async fn test_contains_blocks_in_cache_or_store() {
1649 const CACHED_ROUNDS: Round = 2;
1651
1652 let (mut context, _) = Context::new_for_test(4);
1653 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1654
1655 let context = Arc::new(context);
1656 let store = Arc::new(MemStore::new());
1657 let mut dag_state = DagState::new(context.clone(), store.clone());
1658
1659 let num_rounds: u32 = 10;
1661 let num_authorities: u32 = 4;
1662 let mut blocks = Vec::new();
1663
1664 for round in 1..=num_rounds {
1665 for author in 0..num_authorities {
1666 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1667 blocks.push(block);
1668 }
1669 }
1670
1671 blocks.clone().into_iter().for_each(|block| {
1673 if block.round() <= 4 {
1674 store
1675 .write(WriteBatch::default().blocks(vec![block]))
1676 .unwrap();
1677 } else {
1678 dag_state.accept_blocks(vec![block]);
1679 }
1680 });
1681
1682 let mut block_refs = blocks
1685 .iter()
1686 .map(|block| block.reference())
1687 .collect::<Vec<_>>();
1688 let result = dag_state.contains_blocks(block_refs.clone());
1689
1690 let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1692 assert_eq!(result, expected);
1693
1694 block_refs.insert(
1696 3,
1697 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1698 );
1699 let result = dag_state.contains_blocks(block_refs.clone());
1700
1701 expected.insert(3, false);
1703 assert_eq!(result, expected.clone());
1704 }
1705
1706 #[tokio::test]
1707 async fn test_contains_cached_block_at_slot() {
1708 const CACHED_ROUNDS: Round = 2;
1710
1711 let num_authorities: u32 = 4;
1712 let (mut context, _) = Context::new_for_test(num_authorities as usize);
1713 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1714
1715 let context = Arc::new(context);
1716 let store = Arc::new(MemStore::new());
1717 let mut dag_state = DagState::new(context.clone(), store.clone());
1718
1719 let num_rounds: u32 = 10;
1721 let mut blocks = Vec::new();
1722
1723 for round in 1..=num_rounds {
1724 for author in 0..num_authorities {
1725 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1726 blocks.push(block.clone());
1727 dag_state.accept_block(block);
1728 }
1729 }
1730
1731 for (author, _) in context.committee.authorities() {
1733 assert!(
1734 dag_state.contains_cached_block_at_slot(Slot::new(GENESIS_ROUND, author)),
1735 "Genesis should always be found"
1736 );
1737 }
1738
1739 let mut block_refs = blocks
1742 .iter()
1743 .map(|block| block.reference())
1744 .collect::<Vec<_>>();
1745
1746 for block_ref in block_refs.clone() {
1747 let slot = block_ref.into();
1748 let found = dag_state.contains_cached_block_at_slot(slot);
1749 assert!(found, "A block should be found at slot {}", slot);
1750 }
1751
1752 block_refs.insert(
1755 3,
1756 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1757 );
1758 let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1759 expected.insert(3, false);
1760
1761 for block_ref in block_refs {
1763 let slot = block_ref.into();
1764 let found = dag_state.contains_cached_block_at_slot(slot);
1765
1766 assert_eq!(expected.remove(0), found);
1767 }
1768 }
1769
1770 #[tokio::test]
1771 #[ignore]
1772 #[should_panic(
1773 expected = "Attempted to check for slot [1]3 that is <= the last gc evicted round 3"
1774 )]
1775 async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range() {
1776 const GC_DEPTH: u32 = 2;
1779 const CACHED_ROUNDS: Round = 3;
1781
1782 let (mut context, _) = Context::new_for_test(4);
1783 context
1784 .protocol_config
1785 .set_consensus_gc_depth_for_testing(GC_DEPTH);
1786 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1787
1788 let context = Arc::new(context);
1789 let store = Arc::new(MemStore::new());
1790 let mut dag_state = DagState::new(context.clone(), store.clone());
1791
1792 let mut dag_builder = DagBuilder::new(context.clone());
1794 dag_builder.layers(1..=3).build();
1795 dag_builder
1796 .layers(4..=6)
1797 .authorities(vec![AuthorityIndex::new_for_test(0)])
1798 .skip_block()
1799 .build();
1800
1801 dag_builder
1803 .all_blocks()
1804 .into_iter()
1805 .for_each(|block| dag_state.accept_block(block));
1806
1807 dag_state.add_commit(TrustedCommit::new_for_test(
1809 1 as CommitIndex,
1810 CommitDigest::MIN,
1811 0,
1812 dag_builder.leader_block(5).unwrap().reference(),
1813 vec![],
1814 ));
1815 dag_state.flush();
1817
1818 assert_eq!(dag_state.gc_round(), 3, "GC round should be 3");
1820
1821 for authority_index in 1..=3 {
1825 for round in 4..=6 {
1826 assert!(dag_state.contains_cached_block_at_slot(Slot::new(
1827 round,
1828 AuthorityIndex::new_for_test(authority_index)
1829 )));
1830 }
1831 }
1832
1833 for round in 1..=3 {
1834 assert!(
1835 dag_state.contains_cached_block_at_slot(Slot::new(
1836 round,
1837 AuthorityIndex::new_for_test(0)
1838 ))
1839 );
1840 }
1841
1842 let _ =
1845 dag_state.contains_cached_block_at_slot(Slot::new(3, AuthorityIndex::new_for_test(1)));
1846 }
1847
1848 #[tokio::test]
1849 async fn test_get_blocks_in_cache_or_store() {
1850 let (context, _) = Context::new_for_test(4);
1851 let context = Arc::new(context);
1852 let store = Arc::new(MemStore::new());
1853 let mut dag_state = DagState::new(context.clone(), store.clone());
1854
1855 let num_rounds: u32 = 10;
1857 let num_authorities: u32 = 4;
1858 let mut blocks = Vec::new();
1859
1860 for round in 1..=num_rounds {
1861 for author in 0..num_authorities {
1862 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1863 blocks.push(block);
1864 }
1865 }
1866
1867 blocks.clone().into_iter().for_each(|block| {
1869 if block.round() <= 4 {
1870 store
1871 .write(WriteBatch::default().blocks(vec![block]))
1872 .unwrap();
1873 } else {
1874 dag_state.accept_blocks(vec![block]);
1875 }
1876 });
1877
1878 let mut block_refs = blocks
1881 .iter()
1882 .map(|block| block.reference())
1883 .collect::<Vec<_>>();
1884 let result = dag_state.get_blocks(&block_refs);
1885
1886 let mut expected = blocks
1887 .into_iter()
1888 .map(Some)
1889 .collect::<Vec<Option<VerifiedBlock>>>();
1890
1891 assert_eq!(result, expected.clone());
1893
1894 block_refs.insert(
1896 3,
1897 BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1898 );
1899 let result = dag_state.get_blocks(&block_refs);
1900
1901 expected.insert(3, None);
1903 assert_eq!(result, expected);
1904 }
1905
1906 #[tokio::test]
1907 async fn test_flush_and_recovery() {
1908 telemetry_subscribers::init_for_testing();
1909
1910 const GC_DEPTH: u32 = 3;
1911 const CACHED_ROUNDS: u32 = 4;
1912
1913 let num_authorities: u32 = 4;
1914 let (mut context, _) = Context::new_for_test(num_authorities as usize);
1915 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1916 context
1917 .protocol_config
1918 .set_consensus_gc_depth_for_testing(GC_DEPTH);
1919
1920 let context = Arc::new(context);
1921
1922 let store = Arc::new(MemStore::new());
1923 let mut dag_state = DagState::new(context.clone(), store.clone());
1924
1925 const NUM_ROUNDS: Round = 20;
1926 let mut dag_builder = DagBuilder::new(context.clone());
1927 dag_builder.layers(1..=5).build();
1928 dag_builder
1929 .layers(6..=8)
1930 .authorities(vec![AuthorityIndex::new_for_test(0)])
1931 .skip_block()
1932 .build();
1933 dag_builder.layers(9..=NUM_ROUNDS).build();
1934
1935 const LAST_COMMIT_ROUND: Round = 16;
1937 const LAST_COMMIT_INDEX: CommitIndex = 15;
1938 let commits = dag_builder
1939 .get_sub_dag_and_commits(1..=NUM_ROUNDS)
1940 .into_iter()
1941 .map(|(_subdag, commit)| commit)
1942 .take(LAST_COMMIT_INDEX as usize)
1943 .collect::<Vec<_>>();
1944 assert_eq!(commits.len(), LAST_COMMIT_INDEX as usize);
1945 assert_eq!(commits.last().unwrap().round(), LAST_COMMIT_ROUND);
1946
1947 const PERSISTED_BLOCK_ROUNDS: u32 = 12;
1951 const NUM_PERSISTED_COMMITS: usize = 8;
1952 const LAST_PERSISTED_COMMIT_ROUND: Round = 9;
1953 const LAST_PERSISTED_COMMIT_INDEX: CommitIndex = 8;
1954 dag_state.accept_blocks(dag_builder.blocks(1..=PERSISTED_BLOCK_ROUNDS));
1955 let mut finalized_commits = vec![];
1956 for commit in commits.iter().take(NUM_PERSISTED_COMMITS).cloned() {
1957 finalized_commits.push(commit.clone());
1958 dag_state.add_commit(commit);
1959 }
1960 let last_finalized_commit = finalized_commits.last().unwrap();
1961 assert_eq!(last_finalized_commit.round(), LAST_PERSISTED_COMMIT_ROUND);
1962 assert_eq!(last_finalized_commit.index(), LAST_PERSISTED_COMMIT_INDEX);
1963
1964 let finalized_blocks = finalized_commits
1966 .iter()
1967 .flat_map(|commit| commit.blocks())
1968 .collect::<BTreeSet<_>>();
1969
1970 dag_state.flush();
1972
1973 let store_blocks = store
1975 .scan_blocks_by_author(AuthorityIndex::new_for_test(1), 1)
1976 .unwrap();
1977 assert_eq!(store_blocks.last().unwrap().round(), PERSISTED_BLOCK_ROUNDS);
1978 let store_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1979 assert_eq!(store_commits.len(), NUM_PERSISTED_COMMITS);
1980 assert_eq!(
1981 store_commits.last().unwrap().index(),
1982 LAST_PERSISTED_COMMIT_INDEX
1983 );
1984 assert_eq!(
1985 store_commits.last().unwrap().round(),
1986 LAST_PERSISTED_COMMIT_ROUND
1987 );
1988
1989 dag_state.accept_blocks(dag_builder.blocks(PERSISTED_BLOCK_ROUNDS + 1..=NUM_ROUNDS));
1991 for commit in commits.iter().skip(NUM_PERSISTED_COMMITS).cloned() {
1992 dag_state.add_commit(commit);
1993 }
1994
1995 let all_blocks = dag_builder.blocks(1..=NUM_ROUNDS);
1997 let block_refs = all_blocks
1998 .iter()
1999 .map(|block| block.reference())
2000 .collect::<Vec<_>>();
2001 let result = dag_state
2002 .get_blocks(&block_refs)
2003 .into_iter()
2004 .map(|b| b.unwrap())
2005 .collect::<Vec<_>>();
2006 assert_eq!(result, all_blocks);
2007
2008 assert_eq!(dag_state.last_commit_index(), LAST_COMMIT_INDEX);
2010
2011 drop(dag_state);
2013
2014 let dag_state = DagState::new(context.clone(), store.clone());
2016
2017 let all_blocks = dag_builder.blocks(1..=PERSISTED_BLOCK_ROUNDS);
2019 let block_refs = all_blocks
2020 .iter()
2021 .map(|block| block.reference())
2022 .collect::<Vec<_>>();
2023 let result = dag_state
2024 .get_blocks(&block_refs)
2025 .into_iter()
2026 .map(|b| b.unwrap())
2027 .collect::<Vec<_>>();
2028 assert_eq!(result, all_blocks);
2029
2030 let missing_blocks = dag_builder.blocks(PERSISTED_BLOCK_ROUNDS + 1..=NUM_ROUNDS);
2032 let block_refs = missing_blocks
2033 .iter()
2034 .map(|block| block.reference())
2035 .collect::<Vec<_>>();
2036 let retrieved_blocks = dag_state
2037 .get_blocks(&block_refs)
2038 .into_iter()
2039 .flatten()
2040 .collect::<Vec<_>>();
2041 assert!(retrieved_blocks.is_empty());
2042
2043 assert_eq!(dag_state.last_commit_index(), LAST_PERSISTED_COMMIT_INDEX);
2045 assert_eq!(dag_state.last_commit_round(), LAST_PERSISTED_COMMIT_ROUND);
2046
2047 let expected_last_committed_rounds = vec![5, 9, 8, 8];
2049 assert_eq!(
2050 dag_state.last_committed_rounds(),
2051 expected_last_committed_rounds
2052 );
2053 assert_eq!(dag_state.scoring_subdags_count(), NUM_PERSISTED_COMMITS);
2055
2056 for (authority_index, _) in context.committee.authorities() {
2058 let blocks = dag_state.get_cached_blocks(authority_index, 1);
2059
2060 if authority_index == AuthorityIndex::new_for_test(0) {
2064 assert_eq!(blocks.len(), 4);
2065 assert_eq!(dag_state.evicted_rounds[authority_index.value()], 6);
2066 assert!(
2067 blocks
2068 .into_iter()
2069 .all(|block| block.round() >= 7 && block.round() <= 12)
2070 );
2071 } else {
2072 assert_eq!(blocks.len(), 6);
2073 assert_eq!(dag_state.evicted_rounds[authority_index.value()], 6);
2074 assert!(
2075 blocks
2076 .into_iter()
2077 .all(|block| block.round() >= 7 && block.round() <= 12)
2078 );
2079 }
2080 }
2081
2082 let gc_round = dag_state.gc_round();
2084 assert_eq!(gc_round, 6);
2085 dag_state
2086 .recent_blocks
2087 .iter()
2088 .for_each(|(block_ref, block_info)| {
2089 if block_ref.round > gc_round && finalized_blocks.contains(block_ref) {
2090 assert!(
2091 block_info.committed,
2092 "Block {:?} should be set as committed",
2093 block_ref
2094 );
2095 }
2096 });
2097
2098 dag_state
2103 .recent_blocks
2104 .iter()
2105 .for_each(|(block_ref, block_info)| {
2106 if block_ref.round < PERSISTED_BLOCK_ROUNDS || block_ref.author.value() == 0 {
2107 assert!(block_info.included);
2108 } else {
2109 assert!(!block_info.included);
2110 }
2111 });
2112 }
2113
2114 #[tokio::test]
2115 async fn test_block_info_as_committed() {
2116 let num_authorities: u32 = 4;
2117 let (context, _) = Context::new_for_test(num_authorities as usize);
2118 let context = Arc::new(context);
2119
2120 let store = Arc::new(MemStore::new());
2121 let mut dag_state = DagState::new(context.clone(), store.clone());
2122
2123 let block = VerifiedBlock::new_for_test(
2125 TestBlock::new(1, 0)
2126 .set_timestamp_ms(1000)
2127 .set_ancestors(vec![])
2128 .build(),
2129 );
2130
2131 dag_state.accept_block(block.clone());
2132
2133 assert!(!dag_state.is_committed(&block.reference()));
2135
2136 assert!(
2138 dag_state.set_committed(&block.reference()),
2139 "Block should be successfully set as committed for first time"
2140 );
2141
2142 assert!(dag_state.is_committed(&block.reference()));
2144
2145 assert!(
2147 !dag_state.set_committed(&block.reference()),
2148 "Block should not be successfully set as committed"
2149 );
2150 }
2151
2152 #[tokio::test]
2153 async fn test_get_cached_blocks() {
2154 let (mut context, _) = Context::new_for_test(4);
2155 context.parameters.dag_state_cached_rounds = 5;
2156
2157 let context = Arc::new(context);
2158 let store = Arc::new(MemStore::new());
2159 let mut dag_state = DagState::new(context.clone(), store.clone());
2160
2161 let mut all_blocks = Vec::new();
2166 for author in 1..=3 {
2167 for round in 10..(10 + author) {
2168 let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2169 all_blocks.push(block.clone());
2170 dag_state.accept_block(block);
2171 }
2172 }
2173
2174 let cached_blocks =
2177 dag_state.get_cached_blocks(context.committee.to_authority_index(0).unwrap(), 0);
2178 assert!(cached_blocks.is_empty());
2179
2180 let cached_blocks =
2181 dag_state.get_cached_blocks(context.committee.to_authority_index(1).unwrap(), 10);
2182 assert_eq!(cached_blocks.len(), 1);
2183 assert_eq!(cached_blocks[0].round(), 10);
2184
2185 let cached_blocks =
2186 dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 10);
2187 assert_eq!(cached_blocks.len(), 2);
2188 assert_eq!(cached_blocks[0].round(), 10);
2189 assert_eq!(cached_blocks[1].round(), 11);
2190
2191 let cached_blocks =
2192 dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 11);
2193 assert_eq!(cached_blocks.len(), 1);
2194 assert_eq!(cached_blocks[0].round(), 11);
2195
2196 let cached_blocks =
2197 dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 10);
2198 assert_eq!(cached_blocks.len(), 3);
2199 assert_eq!(cached_blocks[0].round(), 10);
2200 assert_eq!(cached_blocks[1].round(), 11);
2201 assert_eq!(cached_blocks[2].round(), 12);
2202
2203 let cached_blocks =
2204 dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 12);
2205 assert_eq!(cached_blocks.len(), 1);
2206 assert_eq!(cached_blocks[0].round(), 12);
2207
2208 let cached_blocks = dag_state.get_cached_blocks_in_range(
2212 context.committee.to_authority_index(3).unwrap(),
2213 10,
2214 10,
2215 1,
2216 );
2217 assert!(cached_blocks.is_empty());
2218
2219 let cached_blocks = dag_state.get_cached_blocks_in_range(
2221 context.committee.to_authority_index(3).unwrap(),
2222 11,
2223 10,
2224 1,
2225 );
2226 assert!(cached_blocks.is_empty());
2227
2228 let cached_blocks = dag_state.get_cached_blocks_in_range(
2230 context.committee.to_authority_index(0).unwrap(),
2231 9,
2232 10,
2233 1,
2234 );
2235 assert!(cached_blocks.is_empty());
2236
2237 let cached_blocks = dag_state.get_cached_blocks_in_range(
2239 context.committee.to_authority_index(1).unwrap(),
2240 9,
2241 11,
2242 1,
2243 );
2244 assert_eq!(cached_blocks.len(), 1);
2245 assert_eq!(cached_blocks[0].round(), 10);
2246
2247 let cached_blocks = dag_state.get_cached_blocks_in_range(
2249 context.committee.to_authority_index(2).unwrap(),
2250 9,
2251 12,
2252 5,
2253 );
2254 assert_eq!(cached_blocks.len(), 2);
2255 assert_eq!(cached_blocks[0].round(), 10);
2256 assert_eq!(cached_blocks[1].round(), 11);
2257
2258 let cached_blocks = dag_state.get_cached_blocks_in_range(
2260 context.committee.to_authority_index(3).unwrap(),
2261 11,
2262 20,
2263 5,
2264 );
2265 assert_eq!(cached_blocks.len(), 2);
2266 assert_eq!(cached_blocks[0].round(), 11);
2267 assert_eq!(cached_blocks[1].round(), 12);
2268
2269 let cached_blocks = dag_state.get_cached_blocks_in_range(
2271 context.committee.to_authority_index(3).unwrap(),
2272 10,
2273 20,
2274 1,
2275 );
2276 assert_eq!(cached_blocks.len(), 1);
2277 assert_eq!(cached_blocks[0].round(), 10);
2278 }
2279
2280 #[tokio::test]
2281 async fn test_get_last_cached_block() {
2282 const CACHED_ROUNDS: Round = 2;
2284 const GC_DEPTH: u32 = 1;
2285 let (mut context, _) = Context::new_for_test(4);
2286 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2287 context
2288 .protocol_config
2289 .set_consensus_gc_depth_for_testing(GC_DEPTH);
2290
2291 let context = Arc::new(context);
2292 let store = Arc::new(MemStore::new());
2293 let mut dag_state = DagState::new(context.clone(), store.clone());
2294
2295 let dag_str = "DAG {
2300 Round 0 : { 4 },
2301 Round 1 : {
2302 B -> [*],
2303 C -> [*],
2304 D -> [*],
2305 },
2306 Round 2 : {
2307 C -> [*],
2308 D -> [*],
2309 },
2310 Round 3 : {
2311 D -> [*],
2312 },
2313 }";
2314
2315 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2316
2317 let block = VerifiedBlock::new_for_test(TestBlock::new(2, 2).build());
2319
2320 for block in dag_builder
2322 .all_blocks()
2323 .into_iter()
2324 .chain(std::iter::once(block))
2325 {
2326 dag_state.accept_block(block);
2327 }
2328
2329 dag_state.add_commit(TrustedCommit::new_for_test(
2330 1 as CommitIndex,
2331 CommitDigest::MIN,
2332 context.clock.timestamp_utc_ms(),
2333 dag_builder.leader_block(3).unwrap().reference(),
2334 vec![],
2335 ));
2336
2337 let end_round = 4;
2339 let expected_rounds = vec![0, 1, 2, 3];
2340 let expected_excluded_and_equivocating_blocks = vec![0, 0, 1, 0];
2341 let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2343 assert_eq!(
2344 last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2345 expected_rounds
2346 );
2347 assert_eq!(
2348 last_blocks.iter().map(|b| b.1.len()).collect::<Vec<_>>(),
2349 expected_excluded_and_equivocating_blocks
2350 );
2351
2352 for (i, expected_round) in expected_rounds.iter().enumerate() {
2354 let round = dag_state
2355 .get_last_cached_block_in_range(
2356 context.committee.to_authority_index(i).unwrap(),
2357 0,
2358 end_round,
2359 )
2360 .map(|b| b.round())
2361 .unwrap_or_default();
2362 assert_eq!(round, *expected_round, "Authority {i}");
2363 }
2364
2365 let start_round = 2;
2367 let expected_rounds = [0, 0, 2, 3];
2368
2369 for (i, expected_round) in expected_rounds.iter().enumerate() {
2371 let round = dag_state
2372 .get_last_cached_block_in_range(
2373 context.committee.to_authority_index(i).unwrap(),
2374 start_round,
2375 end_round,
2376 )
2377 .map(|b| b.round())
2378 .unwrap_or_default();
2379 assert_eq!(round, *expected_round, "Authority {i}");
2380 }
2381
2382 dag_state.flush();
2388
2389 let end_round = 3;
2391 let expected_rounds = vec![0, 1, 2, 2];
2392
2393 let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2395 assert_eq!(
2396 last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2397 expected_rounds
2398 );
2399
2400 for (i, expected_round) in expected_rounds.iter().enumerate() {
2402 let round = dag_state
2403 .get_last_cached_block_in_range(
2404 context.committee.to_authority_index(i).unwrap(),
2405 0,
2406 end_round,
2407 )
2408 .map(|b| b.round())
2409 .unwrap_or_default();
2410 assert_eq!(round, *expected_round, "Authority {i}");
2411 }
2412 }
2413
2414 #[tokio::test]
2415 #[should_panic(
2416 expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
2417 )]
2418 async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() {
2419 const CACHED_ROUNDS: Round = 1;
2421 const GC_DEPTH: u32 = 1;
2422 let (mut context, _) = Context::new_for_test(4);
2423 context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2424 context
2425 .protocol_config
2426 .set_consensus_gc_depth_for_testing(GC_DEPTH);
2427
2428 let context = Arc::new(context);
2429 let store = Arc::new(MemStore::new());
2430 let mut dag_state = DagState::new(context.clone(), store.clone());
2431
2432 let mut dag_builder = DagBuilder::new(context.clone());
2437 dag_builder
2438 .layers(1..=1)
2439 .authorities(vec![AuthorityIndex::new_for_test(0)])
2440 .skip_block()
2441 .build();
2442 dag_builder
2443 .layers(2..=2)
2444 .authorities(vec![
2445 AuthorityIndex::new_for_test(0),
2446 AuthorityIndex::new_for_test(1),
2447 ])
2448 .skip_block()
2449 .build();
2450 dag_builder
2451 .layers(3..=3)
2452 .authorities(vec![
2453 AuthorityIndex::new_for_test(0),
2454 AuthorityIndex::new_for_test(1),
2455 AuthorityIndex::new_for_test(2),
2456 ])
2457 .skip_block()
2458 .build();
2459
2460 for block in dag_builder.all_blocks() {
2462 dag_state.accept_block(block);
2463 }
2464
2465 dag_state.add_commit(TrustedCommit::new_for_test(
2466 1 as CommitIndex,
2467 CommitDigest::MIN,
2468 0,
2469 dag_builder.leader_block(3).unwrap().reference(),
2470 vec![],
2471 ));
2472
2473 dag_state.flush();
2475
2476 dag_state.get_last_cached_block_per_authority(2);
2478 }
2479
2480 #[tokio::test]
2481 async fn test_last_quorum() {
2482 let (context, _) = Context::new_for_test(4);
2484 let context = Arc::new(context);
2485 let store = Arc::new(MemStore::new());
2486 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2487
2488 {
2490 let genesis = genesis_blocks(context.as_ref());
2491
2492 assert_eq!(dag_state.read().last_quorum(), genesis);
2493 }
2494
2495 {
2497 let mut dag_builder = DagBuilder::new(context.clone());
2498 dag_builder
2499 .layers(1..=4)
2500 .build()
2501 .persist_layers(dag_state.clone());
2502 let round_4_blocks: Vec<_> = dag_builder
2503 .blocks(4..=4)
2504 .into_iter()
2505 .map(|block| block.reference())
2506 .collect();
2507
2508 let last_quorum = dag_state.read().last_quorum();
2509
2510 assert_eq!(
2511 last_quorum
2512 .into_iter()
2513 .map(|block| block.reference())
2514 .collect::<Vec<_>>(),
2515 round_4_blocks
2516 );
2517 }
2518
2519 {
2521 let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2522 dag_state.write().accept_block(block);
2523
2524 let round_4_blocks = dag_state.read().get_uncommitted_blocks_at_round(4);
2525
2526 let last_quorum = dag_state.read().last_quorum();
2527
2528 assert_eq!(last_quorum, round_4_blocks);
2529 }
2530 }
2531
2532 #[tokio::test]
2533 async fn test_last_block_for_authority() {
2534 let (context, _) = Context::new_for_test(4);
2536 let context = Arc::new(context);
2537 let store = Arc::new(MemStore::new());
2538 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2539
2540 {
2542 let genesis = genesis_blocks(context.as_ref());
2543 let my_genesis = genesis
2544 .into_iter()
2545 .find(|block| block.author() == context.own_index)
2546 .unwrap();
2547
2548 assert_eq!(dag_state.read().get_last_proposed_block(), my_genesis);
2549 }
2550
2551 {
2553 let mut dag_builder = DagBuilder::new(context.clone());
2555 dag_builder
2556 .layers(1..=4)
2557 .build()
2558 .persist_layers(dag_state.clone());
2559
2560 let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2562 dag_state.write().accept_block(block);
2563
2564 let block = dag_state
2565 .read()
2566 .get_last_block_for_authority(AuthorityIndex::new_for_test(0));
2567 assert_eq!(block.round(), 5);
2568
2569 for (authority_index, _) in context.committee.authorities() {
2570 let block = dag_state
2571 .read()
2572 .get_last_block_for_authority(authority_index);
2573
2574 if authority_index.value() == 0 {
2575 assert_eq!(block.round(), 5);
2576 } else {
2577 assert_eq!(block.round(), 4);
2578 }
2579 }
2580 }
2581 }
2582
2583 #[tokio::test]
2584 async fn test_accept_block_not_panics_when_timestamp_is_ahead_and_median_timestamp() {
2585 let (context, _) = Context::new_for_test(4);
2587 let context = Arc::new(context);
2588 let store = Arc::new(MemStore::new());
2589 let mut dag_state = DagState::new(context.clone(), store.clone());
2590
2591 let block_timestamp = context.clock.timestamp_utc_ms() + 5_000;
2593
2594 let block = VerifiedBlock::new_for_test(
2595 TestBlock::new(10, 0)
2596 .set_timestamp_ms(block_timestamp)
2597 .build(),
2598 );
2599
2600 dag_state.accept_block(block);
2602 }
2603
2604 #[tokio::test]
2605 async fn test_last_finalized_commit() {
2606 let (context, _) = Context::new_for_test(4);
2608 let context = Arc::new(context);
2609 let store = Arc::new(MemStore::new());
2610 let mut dag_state = DagState::new(context.clone(), store.clone());
2611
2612 let commit_ref = CommitRef::new(1, CommitDigest::MIN);
2614 let rejected_transactions = BTreeMap::new();
2615 dag_state.add_finalized_commit(commit_ref, rejected_transactions.clone());
2616
2617 assert_eq!(dag_state.finalized_commits_to_write.len(), 1);
2619 assert_eq!(
2620 dag_state.finalized_commits_to_write[0],
2621 (commit_ref, rejected_transactions.clone())
2622 );
2623
2624 dag_state.flush();
2626
2627 let last_finalized_commit = store.read_last_finalized_commit().unwrap();
2629 assert_eq!(last_finalized_commit, Some(commit_ref));
2630 let stored_rejected_transactions = store
2631 .read_rejected_transactions(commit_ref)
2632 .unwrap()
2633 .unwrap();
2634 assert_eq!(stored_rejected_transactions, rejected_transactions);
2635 }
2636}