consensus_core/
dag_state.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    cmp::max,
6    collections::{BTreeMap, BTreeSet, VecDeque},
7    ops::Bound::{Excluded, Included, Unbounded},
8    panic,
9    sync::Arc,
10    time::Duration,
11    vec,
12};
13
14use consensus_config::AuthorityIndex;
15use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs, Round, TransactionIndex};
16use itertools::Itertools as _;
17use tokio::time::Instant;
18use tracing::{debug, error, info, trace};
19
20use crate::{
21    CommittedSubDag,
22    block::{BlockAPI, GENESIS_ROUND, Slot, VerifiedBlock, genesis_blocks},
23    commit::{
24        CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRef, CommitVote,
25        GENESIS_COMMIT_INDEX, TrustedCommit, load_committed_subdag_from_store,
26    },
27    context::Context,
28    leader_scoring::{ReputationScores, ScoringSubdag},
29    storage::{Store, WriteBatch},
30    threshold_clock::ThresholdClock,
31};
32
33/// DagState provides the API to write and read accepted blocks from the DAG.
34/// Only uncommitted and last committed blocks are cached in memory.
35/// The rest of blocks are stored on disk.
36/// Refs to cached blocks and additional refs are cached as well, to speed up existence checks.
37///
38/// Note: DagState should be wrapped with Arc<parking_lot::RwLock<_>>, to allow
39/// concurrent access from multiple components.
40pub struct DagState {
41    context: Arc<Context>,
42
43    // The genesis blocks
44    genesis: BTreeMap<BlockRef, VerifiedBlock>,
45
46    // Contains recent blocks within CACHED_ROUNDS from the last committed round per authority.
47    // Note: all uncommitted blocks are kept in memory.
48    //
49    // When GC is enabled, this map has a different semantic. It holds all the recent data for each authority making sure that it always have available
50    // CACHED_ROUNDS worth of data. The entries are evicted based on the latest GC round, however the eviction process will respect the CACHED_ROUNDS.
51    // For each authority, blocks are only evicted when their round is less than or equal to both `gc_round`, and `highest authority round - cached rounds`.
52    // This ensures that the GC requirements are respected (we never clean up any block above `gc_round`), and there are enough blocks cached.
53    recent_blocks: BTreeMap<BlockRef, BlockInfo>,
54
55    // Indexes recent block refs by their authorities.
56    // Vec position corresponds to the authority index.
57    recent_refs_by_authority: Vec<BTreeSet<BlockRef>>,
58
59    // Keeps track of the threshold clock for proposing blocks.
60    threshold_clock: ThresholdClock,
61
62    // Keeps track of the highest round that has been evicted for each authority. Any blocks that are of round <= evict_round
63    // should be considered evicted, and if any exist we should not consider the causauly complete in the order they appear.
64    // The `evicted_rounds` size should be the same as the committee size.
65    evicted_rounds: Vec<Round>,
66
67    // Highest round of blocks accepted.
68    highest_accepted_round: Round,
69
70    // Last consensus commit of the dag.
71    last_commit: Option<TrustedCommit>,
72
73    // Last wall time when commit round advanced. Does not persist across restarts.
74    last_commit_round_advancement_time: Option<std::time::Instant>,
75
76    // Last committed rounds per authority.
77    last_committed_rounds: Vec<Round>,
78
79    /// The committed subdags that have been scored but scores have not been used
80    /// for leader schedule yet.
81    scoring_subdag: ScoringSubdag,
82
83    // Commit votes pending to be included in new blocks.
84    // TODO: limit to 1st commit per round with multi-leader.
85    // TODO: recover unproposed pending commit votes at startup.
86    pending_commit_votes: VecDeque<CommitVote>,
87
88    // Blocks and commits must be buffered for persistence before they can be
89    // inserted into the local DAG or sent to output.
90    blocks_to_write: Vec<VerifiedBlock>,
91    commits_to_write: Vec<TrustedCommit>,
92
93    // Buffers the reputation scores & last_committed_rounds to be flushed with the
94    // next dag state flush. Not writing eagerly is okay because we can recover reputation scores
95    // & last_committed_rounds from the commits as needed.
96    commit_info_to_write: Vec<(CommitRef, CommitInfo)>,
97
98    // Buffers finalized commits and their rejected transactions to be written to storage.
99    finalized_commits_to_write: Vec<(CommitRef, BTreeMap<BlockRef, Vec<TransactionIndex>>)>,
100
101    // Persistent storage for blocks, commits and other consensus data.
102    store: Arc<dyn Store>,
103
104    // The number of cached rounds
105    cached_rounds: Round,
106}
107
108impl DagState {
109    /// Initializes DagState from storage.
110    pub fn new(context: Arc<Context>, store: Arc<dyn Store>) -> Self {
111        let cached_rounds = context.parameters.dag_state_cached_rounds as Round;
112        let num_authorities = context.committee.size();
113
114        let genesis = genesis_blocks(context.as_ref())
115            .into_iter()
116            .map(|block| (block.reference(), block))
117            .collect();
118
119        let threshold_clock = ThresholdClock::new(1, context.clone());
120
121        let last_commit = store
122            .read_last_commit()
123            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
124
125        let commit_info = store
126            .read_last_commit_info()
127            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
128        let (mut last_committed_rounds, commit_recovery_start_index) =
129            if let Some((commit_ref, commit_info)) = commit_info {
130                tracing::info!("Recovering committed state from {commit_ref} {commit_info:?}");
131                (commit_info.committed_rounds, commit_ref.index + 1)
132            } else {
133                tracing::info!("Found no stored CommitInfo to recover from");
134                (vec![0; num_authorities], GENESIS_COMMIT_INDEX + 1)
135            };
136
137        let mut unscored_committed_subdags = Vec::new();
138        let mut scoring_subdag = ScoringSubdag::new(context.clone());
139
140        if let Some(last_commit) = last_commit.as_ref() {
141            store
142                .scan_commits((commit_recovery_start_index..=last_commit.index()).into())
143                .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
144                .iter()
145                .for_each(|commit| {
146                    for block_ref in commit.blocks() {
147                        last_committed_rounds[block_ref.author] =
148                            max(last_committed_rounds[block_ref.author], block_ref.round);
149                    }
150                    let committed_subdag =
151                        load_committed_subdag_from_store(store.as_ref(), commit.clone(), vec![]);
152                    // We don't need to recover reputation scores for unscored_committed_subdags
153                    unscored_committed_subdags.push(committed_subdag);
154                });
155        }
156
157        tracing::info!(
158            "DagState was initialized with the following state: \
159            {last_commit:?}; {last_committed_rounds:?}; {} unscored committed subdags;",
160            unscored_committed_subdags.len()
161        );
162
163        scoring_subdag.add_subdags(std::mem::take(&mut unscored_committed_subdags));
164
165        let mut state = Self {
166            context: context.clone(),
167            genesis,
168            recent_blocks: BTreeMap::new(),
169            recent_refs_by_authority: vec![BTreeSet::new(); num_authorities],
170            threshold_clock,
171            highest_accepted_round: 0,
172            last_commit: last_commit.clone(),
173            last_commit_round_advancement_time: None,
174            last_committed_rounds: last_committed_rounds.clone(),
175            pending_commit_votes: VecDeque::new(),
176            blocks_to_write: vec![],
177            commits_to_write: vec![],
178            commit_info_to_write: vec![],
179            finalized_commits_to_write: vec![],
180            scoring_subdag,
181            store: store.clone(),
182            cached_rounds,
183            evicted_rounds: vec![0; num_authorities],
184        };
185
186        for (authority_index, _) in context.committee.authorities() {
187            let (blocks, eviction_round) = {
188                // Find the latest block for the authority to calculate the eviction round. Then we want to scan and load the blocks from the eviction round and onwards only.
189                // As reminder, the eviction round is taking into account the gc_round.
190                let last_block = state
191                    .store
192                    .scan_last_blocks_by_author(authority_index, 1, None)
193                    .expect("Database error");
194                let last_block_round = last_block
195                    .last()
196                    .map(|b| b.round())
197                    .unwrap_or(GENESIS_ROUND);
198
199                let eviction_round =
200                    Self::eviction_round(last_block_round, state.gc_round(), state.cached_rounds);
201                let blocks = state
202                    .store
203                    .scan_blocks_by_author(authority_index, eviction_round + 1)
204                    .expect("Database error");
205
206                (blocks, eviction_round)
207            };
208
209            state.evicted_rounds[authority_index] = eviction_round;
210
211            // Update the block metadata for the authority.
212            for block in &blocks {
213                state.update_block_metadata(block);
214            }
215
216            debug!(
217                "Recovered blocks {}: {:?}",
218                authority_index,
219                blocks
220                    .iter()
221                    .map(|b| b.reference())
222                    .collect::<Vec<BlockRef>>()
223            );
224        }
225
226        if let Some(last_commit) = last_commit {
227            let mut index = last_commit.index();
228            let gc_round = state.gc_round();
229            info!(
230                "Recovering block commit statuses from commit index {} and backwards until leader of round <= gc_round {:?}",
231                index, gc_round
232            );
233
234            loop {
235                let commits = store
236                    .scan_commits((index..=index).into())
237                    .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
238                let Some(commit) = commits.first() else {
239                    info!("Recovering finished up to index {index}, no more commits to recover");
240                    break;
241                };
242
243                // Check the commit leader round to see if it is within the gc_round. If it is not then we can stop the recovery process.
244                if gc_round > 0 && commit.leader().round <= gc_round {
245                    info!(
246                        "Recovering finished, reached commit leader round {} <= gc_round {}",
247                        commit.leader().round,
248                        gc_round
249                    );
250                    break;
251                }
252
253                commit.blocks().iter().filter(|b| b.round > gc_round).for_each(|block_ref|{
254                    debug!(
255                        "Setting block {:?} as committed based on commit {:?}",
256                        block_ref,
257                        commit.index()
258                    );
259                    assert!(state.set_committed(block_ref), "Attempted to set again a block {:?} as committed when recovering commit {:?}", block_ref, commit);
260                });
261
262                // All commits are indexed starting from 1, so one reach zero exit.
263                index = index.saturating_sub(1);
264                if index == 0 {
265                    break;
266                }
267            }
268        }
269
270        // Recover hard linked statuses for blocks within GC round.
271        let proposed_blocks = store
272            .scan_blocks_by_author(context.own_index, state.gc_round() + 1)
273            .expect("Database error");
274        for block in proposed_blocks {
275            state.link_causal_history(block.reference());
276        }
277
278        state
279    }
280
281    /// Accepts a block into DagState and keeps it in memory.
282    pub(crate) fn accept_block(&mut self, block: VerifiedBlock) {
283        assert_ne!(
284            block.round(),
285            0,
286            "Genesis block should not be accepted into DAG."
287        );
288
289        let block_ref = block.reference();
290        if self.contains_block(&block_ref) {
291            return;
292        }
293
294        let now = self.context.clock.timestamp_utc_ms();
295        if block.timestamp_ms() > now {
296            trace!(
297                "Block {:?} with timestamp {} is greater than local timestamp {}.",
298                block,
299                block.timestamp_ms(),
300                now,
301            );
302        }
303        let hostname = &self.context.committee.authority(block_ref.author).hostname;
304        self.context
305            .metrics
306            .node_metrics
307            .accepted_block_time_drift_ms
308            .with_label_values(&[hostname])
309            .inc_by(block.timestamp_ms().saturating_sub(now));
310
311        // TODO: Move this check to core
312        // Ensure we don't write multiple blocks per slot for our own index
313        if block_ref.author == self.context.own_index {
314            let existing_blocks = self.get_uncommitted_blocks_at_slot(block_ref.into());
315            assert!(
316                existing_blocks.is_empty(),
317                "Block Rejected! Attempted to add block {block:#?} to own slot where \
318                block(s) {existing_blocks:#?} already exists."
319            );
320        }
321        self.update_block_metadata(&block);
322        self.blocks_to_write.push(block);
323        let source = if self.context.own_index == block_ref.author {
324            "own"
325        } else {
326            "others"
327        };
328        self.context
329            .metrics
330            .node_metrics
331            .accepted_blocks
332            .with_label_values(&[source])
333            .inc();
334    }
335
336    /// Updates internal metadata for a block.
337    fn update_block_metadata(&mut self, block: &VerifiedBlock) {
338        let block_ref = block.reference();
339        self.recent_blocks
340            .insert(block_ref, BlockInfo::new(block.clone()));
341        self.recent_refs_by_authority[block_ref.author].insert(block_ref);
342
343        if self.threshold_clock.add_block(block_ref) {
344            // Do not measure quorum delay when no local block is proposed in the round.
345            let last_proposed_block = self.get_last_proposed_block();
346            if last_proposed_block.round() == block_ref.round {
347                let quorum_delay_ms = self
348                    .context
349                    .clock
350                    .timestamp_utc_ms()
351                    .saturating_sub(self.get_last_proposed_block().timestamp_ms());
352                self.context
353                    .metrics
354                    .node_metrics
355                    .quorum_receive_latency
356                    .observe(Duration::from_millis(quorum_delay_ms).as_secs_f64());
357            }
358        }
359
360        self.highest_accepted_round = max(self.highest_accepted_round, block.round());
361        self.context
362            .metrics
363            .node_metrics
364            .highest_accepted_round
365            .set(self.highest_accepted_round as i64);
366
367        let highest_accepted_round_for_author = self.recent_refs_by_authority[block_ref.author]
368            .last()
369            .map(|block_ref| block_ref.round)
370            .expect("There should be by now at least one block ref");
371        let hostname = &self.context.committee.authority(block_ref.author).hostname;
372        self.context
373            .metrics
374            .node_metrics
375            .highest_accepted_authority_round
376            .with_label_values(&[hostname])
377            .set(highest_accepted_round_for_author as i64);
378    }
379
380    /// Accepts a blocks into DagState and keeps it in memory.
381    pub(crate) fn accept_blocks(&mut self, blocks: Vec<VerifiedBlock>) {
382        debug!(
383            "Accepting blocks: {}",
384            blocks.iter().map(|b| b.reference().to_string()).join(",")
385        );
386        for block in blocks {
387            self.accept_block(block);
388        }
389    }
390
391    /// Gets a block by checking cached recent blocks then storage.
392    /// Returns None when the block is not found.
393    pub(crate) fn get_block(&self, reference: &BlockRef) -> Option<VerifiedBlock> {
394        self.get_blocks(&[*reference])
395            .pop()
396            .expect("Exactly one element should be returned")
397    }
398
399    /// Gets blocks by checking genesis, cached recent blocks in memory, then storage.
400    /// An element is None when the corresponding block is not found.
401    pub(crate) fn get_blocks(&self, block_refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
402        let mut blocks = vec![None; block_refs.len()];
403        let mut missing = Vec::new();
404
405        for (index, block_ref) in block_refs.iter().enumerate() {
406            if block_ref.round == GENESIS_ROUND {
407                // Allow the caller to handle the invalid genesis ancestor error.
408                if let Some(block) = self.genesis.get(block_ref) {
409                    blocks[index] = Some(block.clone());
410                }
411                continue;
412            }
413            if let Some(block_info) = self.recent_blocks.get(block_ref) {
414                blocks[index] = Some(block_info.block.clone());
415                continue;
416            }
417            missing.push((index, block_ref));
418        }
419
420        if missing.is_empty() {
421            return blocks;
422        }
423
424        let missing_refs = missing
425            .iter()
426            .map(|(_, block_ref)| **block_ref)
427            .collect::<Vec<_>>();
428        let store_results = self
429            .store
430            .read_blocks(&missing_refs)
431            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
432        self.context
433            .metrics
434            .node_metrics
435            .dag_state_store_read_count
436            .with_label_values(&["get_blocks"])
437            .inc();
438
439        for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
440            blocks[index] = result;
441        }
442
443        blocks
444    }
445
446    /// Gets all uncommitted blocks in a slot.
447    /// Uncommitted blocks must exist in memory, so only in-memory blocks are checked.
448    pub(crate) fn get_uncommitted_blocks_at_slot(&self, slot: Slot) -> Vec<VerifiedBlock> {
449        // TODO: either panic below when the slot is at or below the last committed round,
450        // or support reading from storage while limiting storage reads to edge cases.
451
452        let mut blocks = vec![];
453        for (_block_ref, block_info) in self.recent_blocks.range((
454            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
455            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
456        )) {
457            blocks.push(block_info.block.clone())
458        }
459        blocks
460    }
461
462    /// Gets all uncommitted blocks in a round.
463    /// Uncommitted blocks must exist in memory, so only in-memory blocks are checked.
464    pub(crate) fn get_uncommitted_blocks_at_round(&self, round: Round) -> Vec<VerifiedBlock> {
465        if round <= self.last_commit_round() {
466            panic!("Round {} have committed blocks!", round);
467        }
468
469        let mut blocks = vec![];
470        for (_block_ref, block_info) in self.recent_blocks.range((
471            Included(BlockRef::new(round, AuthorityIndex::ZERO, BlockDigest::MIN)),
472            Excluded(BlockRef::new(
473                round + 1,
474                AuthorityIndex::ZERO,
475                BlockDigest::MIN,
476            )),
477        )) {
478            blocks.push(block_info.block.clone())
479        }
480        blocks
481    }
482
483    /// Gets all ancestors in the history of a block at a certain round.
484    pub(crate) fn ancestors_at_round(
485        &self,
486        later_block: &VerifiedBlock,
487        earlier_round: Round,
488    ) -> Vec<VerifiedBlock> {
489        // Iterate through ancestors of later_block in round descending order.
490        let mut linked: BTreeSet<BlockRef> = later_block.ancestors().iter().cloned().collect();
491        while !linked.is_empty() {
492            let round = linked.last().unwrap().round;
493            // Stop after finishing traversal for ancestors above earlier_round.
494            if round <= earlier_round {
495                break;
496            }
497            let block_ref = linked.pop_last().unwrap();
498            let Some(block) = self.get_block(&block_ref) else {
499                panic!("Block {:?} should exist in DAG!", block_ref);
500            };
501            linked.extend(block.ancestors().iter().cloned());
502        }
503        linked
504            .range((
505                Included(BlockRef::new(
506                    earlier_round,
507                    AuthorityIndex::ZERO,
508                    BlockDigest::MIN,
509                )),
510                Unbounded,
511            ))
512            .map(|r| {
513                self.get_block(r)
514                    .unwrap_or_else(|| panic!("Block {:?} should exist in DAG!", r))
515                    .clone()
516            })
517            .collect()
518    }
519
520    /// Gets the last proposed block from this authority.
521    /// If no block is proposed yet, returns the genesis block.
522    pub(crate) fn get_last_proposed_block(&self) -> VerifiedBlock {
523        self.get_last_block_for_authority(self.context.own_index)
524    }
525
526    /// Retrieves the last accepted block from the specified `authority`. If no block is found in cache
527    /// then the genesis block is returned as no other block has been received from that authority.
528    pub(crate) fn get_last_block_for_authority(&self, authority: AuthorityIndex) -> VerifiedBlock {
529        if let Some(last) = self.recent_refs_by_authority[authority].last() {
530            return self
531                .recent_blocks
532                .get(last)
533                .expect("Block should be found in recent blocks")
534                .block
535                .clone();
536        }
537
538        // if none exists, then fallback to genesis
539        let (_, genesis_block) = self
540            .genesis
541            .iter()
542            .find(|(block_ref, _)| block_ref.author == authority)
543            .expect("Genesis should be found for authority {authority_index}");
544        genesis_block.clone()
545    }
546
547    /// Returns cached recent blocks from the specified authority.
548    /// Blocks returned are limited to round >= `start`, and cached.
549    /// NOTE: caller should not assume returned blocks are always chained.
550    /// "Disconnected" blocks can be returned when there are byzantine blocks,
551    /// or a previously evicted block is accepted again.
552    pub(crate) fn get_cached_blocks(
553        &self,
554        authority: AuthorityIndex,
555        start: Round,
556    ) -> Vec<VerifiedBlock> {
557        self.get_cached_blocks_in_range(authority, start, Round::MAX, usize::MAX)
558    }
559
560    // Retrieves the cached block within the range [start_round, end_round) from a given authority,
561    // limited in total number of blocks.
562    pub(crate) fn get_cached_blocks_in_range(
563        &self,
564        authority: AuthorityIndex,
565        start_round: Round,
566        end_round: Round,
567        limit: usize,
568    ) -> Vec<VerifiedBlock> {
569        if start_round >= end_round || limit == 0 {
570            return vec![];
571        }
572
573        let mut blocks = vec![];
574        for block_ref in self.recent_refs_by_authority[authority].range((
575            Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
576            Excluded(BlockRef::new(
577                end_round,
578                AuthorityIndex::MIN,
579                BlockDigest::MIN,
580            )),
581        )) {
582            let block_info = self
583                .recent_blocks
584                .get(block_ref)
585                .expect("Block should exist in recent blocks");
586            blocks.push(block_info.block.clone());
587            if blocks.len() >= limit {
588                break;
589            }
590        }
591        blocks
592    }
593
594    // Retrieves the last cached block within the range [start_round, end_round) from a given authority.
595    pub(crate) fn get_last_cached_block_in_range(
596        &self,
597        authority: AuthorityIndex,
598        start_round: Round,
599        end_round: Round,
600    ) -> Option<VerifiedBlock> {
601        if start_round >= end_round {
602            return None;
603        }
604
605        let block_ref = self.recent_refs_by_authority[authority]
606            .range((
607                Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
608                Excluded(BlockRef::new(
609                    end_round,
610                    AuthorityIndex::MIN,
611                    BlockDigest::MIN,
612                )),
613            ))
614            .last()?;
615
616        self.recent_blocks
617            .get(block_ref)
618            .map(|block_info| block_info.block.clone())
619    }
620
621    /// Returns the last block proposed per authority with `evicted round < round < end_round`.
622    /// The method is guaranteed to return results only when the `end_round` is not earlier of the
623    /// available cached data for each authority (evicted round + 1), otherwise the method will panic.
624    /// It's the caller's responsibility to ensure that is not requesting for earlier rounds.
625    /// In case of equivocation for an authority's last slot, one block will be returned (the last in order)
626    /// and the other equivocating blocks will be returned.
627    pub(crate) fn get_last_cached_block_per_authority(
628        &self,
629        end_round: Round,
630    ) -> Vec<(VerifiedBlock, Vec<BlockRef>)> {
631        // Initialize with the genesis blocks as fallback
632        let mut blocks = self.genesis.values().cloned().collect::<Vec<_>>();
633        let mut equivocating_blocks = vec![vec![]; self.context.committee.size()];
634
635        if end_round == GENESIS_ROUND {
636            panic!(
637                "Attempted to retrieve blocks earlier than the genesis round which is not possible"
638            );
639        }
640
641        if end_round == GENESIS_ROUND + 1 {
642            return blocks.into_iter().map(|b| (b, vec![])).collect();
643        }
644
645        for (authority_index, block_refs) in self.recent_refs_by_authority.iter().enumerate() {
646            let authority_index = self
647                .context
648                .committee
649                .to_authority_index(authority_index)
650                .unwrap();
651
652            let last_evicted_round = self.evicted_rounds[authority_index];
653            if end_round.saturating_sub(1) <= last_evicted_round {
654                panic!(
655                    "Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}",
656                );
657            }
658
659            let block_ref_iter = block_refs
660                .range((
661                    Included(BlockRef::new(
662                        last_evicted_round + 1,
663                        authority_index,
664                        BlockDigest::MIN,
665                    )),
666                    Excluded(BlockRef::new(end_round, authority_index, BlockDigest::MIN)),
667                ))
668                .rev();
669
670            let mut last_round = 0;
671            for block_ref in block_ref_iter {
672                if last_round == 0 {
673                    last_round = block_ref.round;
674                    let block_info = self
675                        .recent_blocks
676                        .get(block_ref)
677                        .expect("Block should exist in recent blocks");
678                    blocks[authority_index] = block_info.block.clone();
679                    continue;
680                }
681                if block_ref.round < last_round {
682                    break;
683                }
684                equivocating_blocks[authority_index].push(*block_ref);
685            }
686        }
687
688        blocks.into_iter().zip(equivocating_blocks).collect()
689    }
690
691    /// Checks whether a block exists in the slot. The method checks only against the cached data.
692    /// If the user asks for a slot that is not within the cached data then a panic is thrown.
693    pub(crate) fn contains_cached_block_at_slot(&self, slot: Slot) -> bool {
694        // Always return true for genesis slots.
695        if slot.round == GENESIS_ROUND {
696            return true;
697        }
698
699        let eviction_round = self.evicted_rounds[slot.authority];
700        if slot.round <= eviction_round {
701            panic!(
702                "{}",
703                format!(
704                    "Attempted to check for slot {slot} that is <= the last evicted round {eviction_round}"
705                )
706            );
707        }
708
709        let mut result = self.recent_refs_by_authority[slot.authority].range((
710            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
711            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
712        ));
713        result.next().is_some()
714    }
715
716    /// Checks whether the required blocks are in cache, if exist, or otherwise will check in store. The method is not caching
717    /// back the results, so its expensive if keep asking for cache missing blocks.
718    pub(crate) fn contains_blocks(&self, block_refs: Vec<BlockRef>) -> Vec<bool> {
719        let mut exist = vec![false; block_refs.len()];
720        let mut missing = Vec::new();
721
722        for (index, block_ref) in block_refs.into_iter().enumerate() {
723            let recent_refs = &self.recent_refs_by_authority[block_ref.author];
724            if recent_refs.contains(&block_ref) || self.genesis.contains_key(&block_ref) {
725                exist[index] = true;
726            } else if recent_refs.is_empty() || recent_refs.last().unwrap().round < block_ref.round
727            {
728                // Optimization: recent_refs contain the most recent blocks known to this authority.
729                // If a block ref is not found there and has a higher round, it definitely is
730                // missing from this authority and there is no need to check disk.
731                exist[index] = false;
732            } else {
733                missing.push((index, block_ref));
734            }
735        }
736
737        if missing.is_empty() {
738            return exist;
739        }
740
741        let missing_refs = missing
742            .iter()
743            .map(|(_, block_ref)| *block_ref)
744            .collect::<Vec<_>>();
745        let store_results = self
746            .store
747            .contains_blocks(&missing_refs)
748            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
749        self.context
750            .metrics
751            .node_metrics
752            .dag_state_store_read_count
753            .with_label_values(&["contains_blocks"])
754            .inc();
755
756        for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
757            exist[index] = result;
758        }
759
760        exist
761    }
762
763    pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool {
764        let blocks = self.contains_blocks(vec![*block_ref]);
765        blocks.first().cloned().unwrap()
766    }
767
768    // Sets the block as committed in the cache. If the block is set as committed for first time, then true is returned, otherwise false is returned instead.
769    // Method will panic if the block is not found in the cache.
770    pub(crate) fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
771        if let Some(block_info) = self.recent_blocks.get_mut(block_ref) {
772            if !block_info.committed {
773                block_info.committed = true;
774                return true;
775            }
776            false
777        } else {
778            panic!(
779                "Block {:?} not found in cache to set as committed.",
780                block_ref
781            );
782        }
783    }
784
785    /// Returns true if the block is committed. Only valid for blocks above the GC round.
786    pub(crate) fn is_committed(&self, block_ref: &BlockRef) -> bool {
787        self.recent_blocks
788            .get(block_ref)
789            .unwrap_or_else(|| panic!("Attempted to query for commit status for a block not in cached data {block_ref}"))
790            .committed
791    }
792
793    /// Recursively sets blocks in the causal history of the root block as hard linked, including the root block itself.
794    /// Returns the list of blocks that are newly linked.
795    /// The returned blocks are guaranteed to be above the GC round.
796    pub(crate) fn link_causal_history(&mut self, root_block: BlockRef) -> Vec<BlockRef> {
797        let gc_round = self.gc_round();
798        let mut linked_blocks = vec![];
799        let mut targets = VecDeque::new();
800        targets.push_back(root_block);
801        while let Some(block_ref) = targets.pop_front() {
802            // This is only correct with GC enabled.
803            if block_ref.round <= gc_round {
804                continue;
805            }
806            let block_info = self
807                .recent_blocks
808                .get_mut(&block_ref)
809                .unwrap_or_else(|| panic!("Block {:?} is not in DAG state", block_ref));
810            if block_info.included {
811                continue;
812            }
813            linked_blocks.push(block_ref);
814            block_info.included = true;
815            targets.extend(block_info.block.ancestors().iter());
816        }
817        linked_blocks
818    }
819
820    /// Returns true if the block has been included in an owned proposed block.
821    /// NOTE: caller should make sure only blocks above GC round are queried.
822    pub(crate) fn has_been_included(&self, block_ref: &BlockRef) -> bool {
823        self.recent_blocks
824            .get(block_ref)
825            .unwrap_or_else(|| {
826                panic!(
827                    "Attempted to query for inclusion status for a block not in cached data {}",
828                    block_ref
829                )
830            })
831            .included
832    }
833
834    pub(crate) fn threshold_clock_round(&self) -> Round {
835        self.threshold_clock.get_round()
836    }
837
838    // The timestamp of when quorum threshold was last reached in the threshold clock.
839    pub(crate) fn threshold_clock_quorum_ts(&self) -> Instant {
840        self.threshold_clock.get_quorum_ts()
841    }
842
843    pub(crate) fn highest_accepted_round(&self) -> Round {
844        self.highest_accepted_round
845    }
846
847    // Buffers a new commit in memory and updates last committed rounds.
848    // REQUIRED: must not skip over any commit index.
849    pub(crate) fn add_commit(&mut self, commit: TrustedCommit) {
850        let time_diff = if let Some(last_commit) = &self.last_commit {
851            if commit.index() <= last_commit.index() {
852                error!(
853                    "New commit index {} <= last commit index {}!",
854                    commit.index(),
855                    last_commit.index()
856                );
857                return;
858            }
859            assert_eq!(commit.index(), last_commit.index() + 1);
860
861            if commit.timestamp_ms() < last_commit.timestamp_ms() {
862                panic!(
863                    "Commit timestamps do not monotonically increment, prev commit {:?}, new commit {:?}",
864                    last_commit, commit
865                );
866            }
867            commit
868                .timestamp_ms()
869                .saturating_sub(last_commit.timestamp_ms())
870        } else {
871            assert_eq!(commit.index(), 1);
872            0
873        };
874
875        self.context
876            .metrics
877            .node_metrics
878            .last_commit_time_diff
879            .observe(time_diff as f64);
880
881        let commit_round_advanced = if let Some(previous_commit) = &self.last_commit {
882            previous_commit.round() < commit.round()
883        } else {
884            true
885        };
886
887        self.last_commit = Some(commit.clone());
888
889        if commit_round_advanced {
890            let now = std::time::Instant::now();
891            if let Some(previous_time) = self.last_commit_round_advancement_time {
892                self.context
893                    .metrics
894                    .node_metrics
895                    .commit_round_advancement_interval
896                    .observe(now.duration_since(previous_time).as_secs_f64())
897            }
898            self.last_commit_round_advancement_time = Some(now);
899        }
900
901        for block_ref in commit.blocks().iter() {
902            self.last_committed_rounds[block_ref.author] = max(
903                self.last_committed_rounds[block_ref.author],
904                block_ref.round,
905            );
906        }
907
908        for (i, round) in self.last_committed_rounds.iter().enumerate() {
909            let index = self.context.committee.to_authority_index(i).unwrap();
910            let hostname = &self.context.committee.authority(index).hostname;
911            self.context
912                .metrics
913                .node_metrics
914                .last_committed_authority_round
915                .with_label_values(&[hostname])
916                .set((*round).into());
917        }
918
919        self.pending_commit_votes.push_back(commit.reference());
920        self.commits_to_write.push(commit);
921    }
922
923    /// Recovers commits to write from storage, at startup.
924    pub(crate) fn recover_commits_to_write(&mut self, commits: Vec<TrustedCommit>) {
925        self.commits_to_write.extend(commits);
926    }
927
928    pub(crate) fn ensure_commits_to_write_is_empty(&self) {
929        assert!(
930            self.commits_to_write.is_empty(),
931            "Commits to write should be empty. {:?}",
932            self.commits_to_write,
933        );
934    }
935
936    pub(crate) fn add_commit_info(&mut self, reputation_scores: ReputationScores) {
937        // We create an empty scoring subdag once reputation scores are calculated.
938        // Note: It is okay for this to not be gated by protocol config as the
939        // scoring_subdag should be empty in either case at this point.
940        assert!(self.scoring_subdag.is_empty());
941
942        let commit_info = CommitInfo {
943            committed_rounds: self.last_committed_rounds.clone(),
944            reputation_scores,
945        };
946        let last_commit = self
947            .last_commit
948            .as_ref()
949            .expect("Last commit should already be set.");
950        self.commit_info_to_write
951            .push((last_commit.reference(), commit_info));
952    }
953
954    pub(crate) fn add_finalized_commit(
955        &mut self,
956        commit_ref: CommitRef,
957        rejected_transactions: BTreeMap<BlockRef, Vec<TransactionIndex>>,
958    ) {
959        self.finalized_commits_to_write
960            .push((commit_ref, rejected_transactions));
961    }
962
963    pub(crate) fn take_commit_votes(&mut self, limit: usize) -> Vec<CommitVote> {
964        let mut votes = Vec::new();
965        while !self.pending_commit_votes.is_empty() && votes.len() < limit {
966            votes.push(self.pending_commit_votes.pop_front().unwrap());
967        }
968        votes
969    }
970
971    /// Index of the last commit.
972    pub(crate) fn last_commit_index(&self) -> CommitIndex {
973        match &self.last_commit {
974            Some(commit) => commit.index(),
975            None => 0,
976        }
977    }
978
979    /// Digest of the last commit.
980    pub(crate) fn last_commit_digest(&self) -> CommitDigest {
981        match &self.last_commit {
982            Some(commit) => commit.digest(),
983            None => CommitDigest::MIN,
984        }
985    }
986
987    /// Timestamp of the last commit.
988    pub(crate) fn last_commit_timestamp_ms(&self) -> BlockTimestampMs {
989        match &self.last_commit {
990            Some(commit) => commit.timestamp_ms(),
991            None => 0,
992        }
993    }
994
995    /// Leader slot of the last commit.
996    pub(crate) fn last_commit_leader(&self) -> Slot {
997        match &self.last_commit {
998            Some(commit) => commit.leader().into(),
999            None => self
1000                .genesis
1001                .iter()
1002                .next()
1003                .map(|(genesis_ref, _)| *genesis_ref)
1004                .expect("Genesis blocks should always be available.")
1005                .into(),
1006        }
1007    }
1008
1009    /// Highest round where a block is committed, which is last commit's leader round.
1010    pub(crate) fn last_commit_round(&self) -> Round {
1011        match &self.last_commit {
1012            Some(commit) => commit.leader().round,
1013            None => 0,
1014        }
1015    }
1016
1017    /// Last committed round per authority.
1018    pub(crate) fn last_committed_rounds(&self) -> Vec<Round> {
1019        self.last_committed_rounds.clone()
1020    }
1021
1022    /// The GC round is the highest round that blocks of equal or lower round are considered obsolete and no longer possible to be committed.
1023    /// There is no meaning accepting any blocks with round <= gc_round. The Garbage Collection (GC) round is calculated based on the latest
1024    /// committed leader round. When GC is disabled that will return the genesis round.
1025    pub(crate) fn gc_round(&self) -> Round {
1026        self.calculate_gc_round(self.last_commit_round())
1027    }
1028
1029    /// Calculates the GC round from the input leader round, which can be different
1030    /// from the last committed leader round.
1031    pub(crate) fn calculate_gc_round(&self, commit_round: Round) -> Round {
1032        commit_round.saturating_sub(self.context.protocol_config.gc_depth())
1033    }
1034
1035    /// Flushes unpersisted blocks, commits and commit info to storage.
1036    ///
1037    /// REQUIRED: when buffering a block, all of its ancestors and the latest commit which sets the GC round
1038    /// must also be buffered.
1039    /// REQUIRED: when buffering a commit, all of its included blocks and the previous commits must also be buffered.
1040    /// REQUIRED: when flushing, all of the buffered blocks and commits must be flushed together to ensure consistency.
1041    ///
1042    /// After each flush, DagState becomes persisted in storage and it expected to recover
1043    /// all internal states from storage after restarts.
1044    pub(crate) fn flush(&mut self) {
1045        let _s = self
1046            .context
1047            .metrics
1048            .node_metrics
1049            .scope_processing_time
1050            .with_label_values(&["DagState::flush"])
1051            .start_timer();
1052
1053        // Flush buffered data to storage.
1054        let pending_blocks = std::mem::take(&mut self.blocks_to_write);
1055        let pending_commits = std::mem::take(&mut self.commits_to_write);
1056        let pending_commit_info = std::mem::take(&mut self.commit_info_to_write);
1057        let pending_finalized_commits = std::mem::take(&mut self.finalized_commits_to_write);
1058        if pending_blocks.is_empty()
1059            && pending_commits.is_empty()
1060            && pending_commit_info.is_empty()
1061            && pending_finalized_commits.is_empty()
1062        {
1063            return;
1064        }
1065
1066        debug!(
1067            "Flushing {} blocks ({}), {} commits ({}), {} commit infos ({}), {} finalized commits ({}) to storage.",
1068            pending_blocks.len(),
1069            pending_blocks
1070                .iter()
1071                .map(|b| b.reference().to_string())
1072                .join(","),
1073            pending_commits.len(),
1074            pending_commits
1075                .iter()
1076                .map(|c| c.reference().to_string())
1077                .join(","),
1078            pending_commit_info.len(),
1079            pending_commit_info
1080                .iter()
1081                .map(|(commit_ref, _)| commit_ref.to_string())
1082                .join(","),
1083            pending_finalized_commits.len(),
1084            pending_finalized_commits
1085                .iter()
1086                .map(|(commit_ref, _)| commit_ref.to_string())
1087                .join(","),
1088        );
1089        self.store
1090            .write(WriteBatch::new(
1091                pending_blocks,
1092                pending_commits,
1093                pending_commit_info,
1094                pending_finalized_commits,
1095            ))
1096            .unwrap_or_else(|e| panic!("Failed to write to storage: {:?}", e));
1097        self.context
1098            .metrics
1099            .node_metrics
1100            .dag_state_store_write_count
1101            .inc();
1102
1103        // Clean up old cached data. After flushing, all cached blocks are guaranteed to be persisted.
1104        for (authority_index, _) in self.context.committee.authorities() {
1105            let eviction_round = self.calculate_authority_eviction_round(authority_index);
1106            while let Some(block_ref) = self.recent_refs_by_authority[authority_index].first() {
1107                if block_ref.round <= eviction_round {
1108                    self.recent_blocks.remove(block_ref);
1109                    self.recent_refs_by_authority[authority_index].pop_first();
1110                } else {
1111                    break;
1112                }
1113            }
1114            self.evicted_rounds[authority_index] = eviction_round;
1115        }
1116
1117        let metrics = &self.context.metrics.node_metrics;
1118        metrics
1119            .dag_state_recent_blocks
1120            .set(self.recent_blocks.len() as i64);
1121        metrics.dag_state_recent_refs.set(
1122            self.recent_refs_by_authority
1123                .iter()
1124                .map(BTreeSet::len)
1125                .sum::<usize>() as i64,
1126        );
1127    }
1128
1129    pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> {
1130        self.store
1131            .read_last_commit_info()
1132            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
1133    }
1134
1135    pub(crate) fn add_scoring_subdags(&mut self, scoring_subdags: Vec<CommittedSubDag>) {
1136        self.scoring_subdag.add_subdags(scoring_subdags);
1137    }
1138
1139    pub(crate) fn clear_scoring_subdag(&mut self) {
1140        self.scoring_subdag.clear();
1141    }
1142
1143    pub(crate) fn scoring_subdags_count(&self) -> usize {
1144        self.scoring_subdag.scored_subdags_count()
1145    }
1146
1147    pub(crate) fn is_scoring_subdag_empty(&self) -> bool {
1148        self.scoring_subdag.is_empty()
1149    }
1150
1151    pub(crate) fn calculate_scoring_subdag_scores(&self) -> ReputationScores {
1152        self.scoring_subdag.calculate_distributed_vote_scores()
1153    }
1154
1155    pub(crate) fn scoring_subdag_commit_range(&self) -> CommitIndex {
1156        self.scoring_subdag
1157            .commit_range
1158            .as_ref()
1159            .expect("commit range should exist for scoring subdag")
1160            .end()
1161    }
1162
1163    /// The last round that should get evicted after a cache clean up operation. After this round we are
1164    /// guaranteed to have all the produced blocks from that authority. For any round that is
1165    /// <= `last_evicted_round` we don't have such guarantees as out of order blocks might exist.
1166    fn calculate_authority_eviction_round(&self, authority_index: AuthorityIndex) -> Round {
1167        let last_round = self.recent_refs_by_authority[authority_index]
1168            .last()
1169            .map(|block_ref| block_ref.round)
1170            .unwrap_or(GENESIS_ROUND);
1171
1172        Self::eviction_round(last_round, self.gc_round(), self.cached_rounds)
1173    }
1174
1175    /// Calculates the eviction round for the given authority. The goal is to keep at least `cached_rounds`
1176    /// of the latest blocks in the cache (if enough data is available), while evicting blocks with rounds <= `gc_round` when possible.
1177    fn eviction_round(last_round: Round, gc_round: Round, cached_rounds: u32) -> Round {
1178        gc_round.min(last_round.saturating_sub(cached_rounds))
1179    }
1180
1181    /// Returns the underlying store.
1182    pub(crate) fn store(&self) -> Arc<dyn Store> {
1183        self.store.clone()
1184    }
1185
1186    /// Detects and returns the blocks of the round that forms the last quorum. The method will return
1187    /// the quorum even if that's genesis.
1188    #[cfg(test)]
1189    pub(crate) fn last_quorum(&self) -> Vec<VerifiedBlock> {
1190        // the quorum should exist either on the highest accepted round or the one before. If we fail to detect
1191        // a quorum then it means that our DAG has advanced with missing causal history.
1192        for round in
1193            (self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev()
1194        {
1195            if round == GENESIS_ROUND {
1196                return self.genesis_blocks();
1197            }
1198            use crate::stake_aggregator::{QuorumThreshold, StakeAggregator};
1199            let mut quorum = StakeAggregator::<QuorumThreshold>::new();
1200
1201            // Since the minimum wave length is 3 we expect to find a quorum in the uncommitted rounds.
1202            let blocks = self.get_uncommitted_blocks_at_round(round);
1203            for block in &blocks {
1204                if quorum.add(block.author(), &self.context.committee) {
1205                    return blocks;
1206                }
1207            }
1208        }
1209
1210        panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds.");
1211    }
1212
1213    #[cfg(test)]
1214    pub(crate) fn genesis_blocks(&self) -> Vec<VerifiedBlock> {
1215        self.genesis.values().cloned().collect()
1216    }
1217
1218    #[cfg(test)]
1219    pub(crate) fn set_last_commit(&mut self, commit: TrustedCommit) {
1220        self.last_commit = Some(commit);
1221    }
1222}
1223
1224struct BlockInfo {
1225    block: VerifiedBlock,
1226    // Whether the block has been committed
1227    committed: bool,
1228    // Whether the block has been included in the causal history of an owned proposed block.
1229    ///
1230    /// There are two usages of this field:
1231    /// 1. When proposing blocks, determine the set of blocks to carry votes for.
1232    /// 2. When recovering, determine if a block has not been included in a proposed block and
1233    ///    should recover transaction votes by voting.
1234    included: bool,
1235}
1236
1237impl BlockInfo {
1238    fn new(block: VerifiedBlock) -> Self {
1239        Self {
1240            block,
1241            committed: false,
1242            included: false,
1243        }
1244    }
1245}
1246
1247#[cfg(test)]
1248mod test {
1249    use std::vec;
1250
1251    use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs};
1252    use parking_lot::RwLock;
1253
1254    use super::*;
1255    use crate::{
1256        block::{TestBlock, VerifiedBlock},
1257        storage::{WriteBatch, mem_store::MemStore},
1258        test_dag_builder::DagBuilder,
1259        test_dag_parser::parse_dag,
1260    };
1261
1262    #[tokio::test]
1263    async fn test_get_blocks() {
1264        let (context, _) = Context::new_for_test(4);
1265        let context = Arc::new(context);
1266        let store = Arc::new(MemStore::new());
1267        let mut dag_state = DagState::new(context.clone(), store.clone());
1268        let own_index = AuthorityIndex::new_for_test(0);
1269
1270        // Populate test blocks for round 1 ~ 10, authorities 0 ~ 2.
1271        let num_rounds: u32 = 10;
1272        let non_existent_round: u32 = 100;
1273        let num_authorities: u32 = 3;
1274        let num_blocks_per_slot: usize = 3;
1275        let mut blocks = BTreeMap::new();
1276        for round in 1..=num_rounds {
1277            for author in 0..num_authorities {
1278                // Create 3 blocks per slot, with different timestamps and digests.
1279                let base_ts = round as BlockTimestampMs * 1000;
1280                for timestamp in base_ts..base_ts + num_blocks_per_slot as u64 {
1281                    let block = VerifiedBlock::new_for_test(
1282                        TestBlock::new(round, author)
1283                            .set_timestamp_ms(timestamp)
1284                            .build(),
1285                    );
1286                    dag_state.accept_block(block.clone());
1287                    blocks.insert(block.reference(), block);
1288
1289                    // Only write one block per slot for own index
1290                    if AuthorityIndex::new_for_test(author) == own_index {
1291                        break;
1292                    }
1293                }
1294            }
1295        }
1296
1297        // Check uncommitted blocks that exist.
1298        for (r, block) in &blocks {
1299            assert_eq!(&dag_state.get_block(r).unwrap(), block);
1300        }
1301
1302        // Check uncommitted blocks that do not exist.
1303        let last_ref = blocks.keys().last().unwrap();
1304        assert!(
1305            dag_state
1306                .get_block(&BlockRef::new(
1307                    last_ref.round,
1308                    last_ref.author,
1309                    BlockDigest::MIN
1310                ))
1311                .is_none()
1312        );
1313
1314        // Check slots with uncommitted blocks.
1315        for round in 1..=num_rounds {
1316            for author in 0..num_authorities {
1317                let slot = Slot::new(
1318                    round,
1319                    context
1320                        .committee
1321                        .to_authority_index(author as usize)
1322                        .unwrap(),
1323                );
1324                let blocks = dag_state.get_uncommitted_blocks_at_slot(slot);
1325
1326                // We only write one block per slot for own index
1327                if AuthorityIndex::new_for_test(author) == own_index {
1328                    assert_eq!(blocks.len(), 1);
1329                } else {
1330                    assert_eq!(blocks.len(), num_blocks_per_slot);
1331                }
1332
1333                for b in blocks {
1334                    assert_eq!(b.round(), round);
1335                    assert_eq!(
1336                        b.author(),
1337                        context
1338                            .committee
1339                            .to_authority_index(author as usize)
1340                            .unwrap()
1341                    );
1342                }
1343            }
1344        }
1345
1346        // Check slots without uncommitted blocks.
1347        let slot = Slot::new(non_existent_round, AuthorityIndex::ZERO);
1348        assert!(dag_state.get_uncommitted_blocks_at_slot(slot).is_empty());
1349
1350        // Check rounds with uncommitted blocks.
1351        for round in 1..=num_rounds {
1352            let blocks = dag_state.get_uncommitted_blocks_at_round(round);
1353            // Expect 3 blocks per authority except for own authority which should
1354            // have 1 block.
1355            assert_eq!(
1356                blocks.len(),
1357                (num_authorities - 1) as usize * num_blocks_per_slot + 1
1358            );
1359            for b in blocks {
1360                assert_eq!(b.round(), round);
1361            }
1362        }
1363
1364        // Check rounds without uncommitted blocks.
1365        assert!(
1366            dag_state
1367                .get_uncommitted_blocks_at_round(non_existent_round)
1368                .is_empty()
1369        );
1370    }
1371
1372    #[tokio::test]
1373    async fn test_ancestors_at_uncommitted_round() {
1374        // Initialize DagState.
1375        let (context, _) = Context::new_for_test(4);
1376        let context = Arc::new(context);
1377        let store = Arc::new(MemStore::new());
1378        let mut dag_state = DagState::new(context.clone(), store.clone());
1379
1380        // Populate DagState.
1381
1382        // Round 10 refs will not have their blocks in DagState.
1383        let round_10_refs: Vec<_> = (0..4)
1384            .map(|a| {
1385                VerifiedBlock::new_for_test(TestBlock::new(10, a).set_timestamp_ms(1000).build())
1386                    .reference()
1387            })
1388            .collect();
1389
1390        // Round 11 blocks.
1391        let round_11 = vec![
1392            // This will connect to round 12.
1393            VerifiedBlock::new_for_test(
1394                TestBlock::new(11, 0)
1395                    .set_timestamp_ms(1100)
1396                    .set_ancestors(round_10_refs.clone())
1397                    .build(),
1398            ),
1399            // Slot(11, 1) has 3 blocks.
1400            // This will connect to round 12.
1401            VerifiedBlock::new_for_test(
1402                TestBlock::new(11, 1)
1403                    .set_timestamp_ms(1110)
1404                    .set_ancestors(round_10_refs.clone())
1405                    .build(),
1406            ),
1407            // This will connect to round 13.
1408            VerifiedBlock::new_for_test(
1409                TestBlock::new(11, 1)
1410                    .set_timestamp_ms(1111)
1411                    .set_ancestors(round_10_refs.clone())
1412                    .build(),
1413            ),
1414            // This will not connect to any block.
1415            VerifiedBlock::new_for_test(
1416                TestBlock::new(11, 1)
1417                    .set_timestamp_ms(1112)
1418                    .set_ancestors(round_10_refs.clone())
1419                    .build(),
1420            ),
1421            // This will not connect to any block.
1422            VerifiedBlock::new_for_test(
1423                TestBlock::new(11, 2)
1424                    .set_timestamp_ms(1120)
1425                    .set_ancestors(round_10_refs.clone())
1426                    .build(),
1427            ),
1428            // This will connect to round 12.
1429            VerifiedBlock::new_for_test(
1430                TestBlock::new(11, 3)
1431                    .set_timestamp_ms(1130)
1432                    .set_ancestors(round_10_refs.clone())
1433                    .build(),
1434            ),
1435        ];
1436
1437        // Round 12 blocks.
1438        let ancestors_for_round_12 = vec![
1439            round_11[0].reference(),
1440            round_11[1].reference(),
1441            round_11[5].reference(),
1442        ];
1443        let round_12 = vec![
1444            VerifiedBlock::new_for_test(
1445                TestBlock::new(12, 0)
1446                    .set_timestamp_ms(1200)
1447                    .set_ancestors(ancestors_for_round_12.clone())
1448                    .build(),
1449            ),
1450            VerifiedBlock::new_for_test(
1451                TestBlock::new(12, 2)
1452                    .set_timestamp_ms(1220)
1453                    .set_ancestors(ancestors_for_round_12.clone())
1454                    .build(),
1455            ),
1456            VerifiedBlock::new_for_test(
1457                TestBlock::new(12, 3)
1458                    .set_timestamp_ms(1230)
1459                    .set_ancestors(ancestors_for_round_12.clone())
1460                    .build(),
1461            ),
1462        ];
1463
1464        // Round 13 blocks.
1465        let ancestors_for_round_13 = vec![
1466            round_12[0].reference(),
1467            round_12[1].reference(),
1468            round_12[2].reference(),
1469            round_11[2].reference(),
1470        ];
1471        let round_13 = vec![
1472            VerifiedBlock::new_for_test(
1473                TestBlock::new(12, 1)
1474                    .set_timestamp_ms(1300)
1475                    .set_ancestors(ancestors_for_round_13.clone())
1476                    .build(),
1477            ),
1478            VerifiedBlock::new_for_test(
1479                TestBlock::new(12, 2)
1480                    .set_timestamp_ms(1320)
1481                    .set_ancestors(ancestors_for_round_13.clone())
1482                    .build(),
1483            ),
1484            VerifiedBlock::new_for_test(
1485                TestBlock::new(12, 3)
1486                    .set_timestamp_ms(1330)
1487                    .set_ancestors(ancestors_for_round_13.clone())
1488                    .build(),
1489            ),
1490        ];
1491
1492        // Round 14 anchor block.
1493        let ancestors_for_round_14 = round_13.iter().map(|b| b.reference()).collect();
1494        let anchor = VerifiedBlock::new_for_test(
1495            TestBlock::new(14, 1)
1496                .set_timestamp_ms(1410)
1497                .set_ancestors(ancestors_for_round_14)
1498                .build(),
1499        );
1500
1501        // Add all blocks (at and above round 11) to DagState.
1502        for b in round_11
1503            .iter()
1504            .chain(round_12.iter())
1505            .chain(round_13.iter())
1506            .chain([anchor.clone()].iter())
1507        {
1508            dag_state.accept_block(b.clone());
1509        }
1510
1511        // Check ancestors connected to anchor.
1512        let ancestors = dag_state.ancestors_at_round(&anchor, 11);
1513        let mut ancestors_refs: Vec<BlockRef> = ancestors.iter().map(|b| b.reference()).collect();
1514        ancestors_refs.sort();
1515        let mut expected_refs = vec![
1516            round_11[0].reference(),
1517            round_11[1].reference(),
1518            round_11[2].reference(),
1519            round_11[5].reference(),
1520        ];
1521        expected_refs.sort(); // we need to sort as blocks with same author and round of round 11 (position 1 & 2) might not be in right lexicographical order.
1522        assert_eq!(
1523            ancestors_refs, expected_refs,
1524            "Expected round 11 ancestors: {:?}. Got: {:?}",
1525            expected_refs, ancestors_refs
1526        );
1527    }
1528
1529    #[tokio::test]
1530    async fn test_link_causal_history() {
1531        let (mut context, _) = Context::new_for_test(4);
1532        context.parameters.dag_state_cached_rounds = 10;
1533        context
1534            .protocol_config
1535            .set_consensus_gc_depth_for_testing(3);
1536        let context = Arc::new(context);
1537
1538        let store = Arc::new(MemStore::new());
1539        let mut dag_state = DagState::new(context.clone(), store.clone());
1540
1541        // Create for rounds 1..=6. Skip creating blocks for authority 0 for rounds 4 - 6.
1542        let mut dag_builder = DagBuilder::new(context.clone());
1543        dag_builder.layers(1..=3).build();
1544        dag_builder
1545            .layers(4..=6)
1546            .authorities(vec![AuthorityIndex::new_for_test(0)])
1547            .skip_block()
1548            .build();
1549
1550        // Accept all blocks
1551        let all_blocks = dag_builder.all_blocks();
1552        dag_state.accept_blocks(all_blocks.clone());
1553
1554        // No block is linked yet.
1555        for block in &all_blocks {
1556            assert!(!dag_state.has_been_included(&block.reference()));
1557        }
1558
1559        // Link causal history from a round 1 block.
1560        let round_1_block = &all_blocks[1];
1561        assert_eq!(round_1_block.round(), 1);
1562        let linked_blocks = dag_state.link_causal_history(round_1_block.reference());
1563
1564        // Check that the block is linked.
1565        assert_eq!(linked_blocks.len(), 1);
1566        assert_eq!(linked_blocks[0], round_1_block.reference());
1567        for block_ref in linked_blocks {
1568            assert!(dag_state.has_been_included(&block_ref));
1569        }
1570
1571        // Link causal history from a round 2 block.
1572        let round_2_block = &all_blocks[4];
1573        assert_eq!(round_2_block.round(), 2);
1574        let linked_blocks = dag_state.link_causal_history(round_2_block.reference());
1575
1576        // Check the linked blocks.
1577        assert_eq!(linked_blocks.len(), 4);
1578        for block_ref in linked_blocks {
1579            assert!(block_ref == round_2_block.reference() || block_ref.round == 1);
1580        }
1581
1582        // Check linked status in dag state.
1583        for block in &all_blocks {
1584            if block.round() == 1 || block.reference() == round_2_block.reference() {
1585                assert!(dag_state.has_been_included(&block.reference()));
1586            } else {
1587                assert!(!dag_state.has_been_included(&block.reference()));
1588            }
1589        }
1590
1591        // Select round 6 block.
1592        let round_6_block = all_blocks.last().unwrap();
1593        assert_eq!(round_6_block.round(), 6);
1594
1595        // Get GC round to 3.
1596        let last_commit = TrustedCommit::new_for_test(
1597            6,
1598            CommitDigest::MIN,
1599            context.clock.timestamp_utc_ms(),
1600            round_6_block.reference(),
1601            vec![],
1602        );
1603        dag_state.set_last_commit(last_commit);
1604        assert_eq!(
1605            dag_state.gc_round(),
1606            3,
1607            "GC round should have moved to round 3"
1608        );
1609
1610        // Link causal history from a round 6 block.
1611        let linked_blocks = dag_state.link_causal_history(round_6_block.reference());
1612
1613        // Check the linked blocks. They should not include GC'ed blocks.
1614        assert_eq!(linked_blocks.len(), 7, "Linked blocks: {:?}", linked_blocks);
1615        for block_ref in linked_blocks {
1616            assert!(
1617                block_ref.round == 4
1618                    || block_ref.round == 5
1619                    || block_ref == round_6_block.reference()
1620            );
1621        }
1622
1623        // Check linked status in dag state.
1624        for block in &all_blocks {
1625            let block_ref = block.reference();
1626            if block.round() == 1
1627                || block_ref == round_2_block.reference()
1628                || block_ref.round == 4
1629                || block_ref.round == 5
1630                || block_ref == round_6_block.reference()
1631            {
1632                assert!(dag_state.has_been_included(&block.reference()));
1633            } else {
1634                assert!(!dag_state.has_been_included(&block.reference()));
1635            }
1636        }
1637    }
1638
1639    #[tokio::test]
1640    async fn test_contains_blocks_in_cache_or_store() {
1641        /// Only keep elements up to 2 rounds before the last committed round
1642        const CACHED_ROUNDS: Round = 2;
1643
1644        let (mut context, _) = Context::new_for_test(4);
1645        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1646
1647        let context = Arc::new(context);
1648        let store = Arc::new(MemStore::new());
1649        let mut dag_state = DagState::new(context.clone(), store.clone());
1650
1651        // Create test blocks for round 1 ~ 10
1652        let num_rounds: u32 = 10;
1653        let num_authorities: u32 = 4;
1654        let mut blocks = Vec::new();
1655
1656        for round in 1..=num_rounds {
1657            for author in 0..num_authorities {
1658                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1659                blocks.push(block);
1660            }
1661        }
1662
1663        // Now write in store the blocks from first 4 rounds and the rest to the dag state
1664        blocks.clone().into_iter().for_each(|block| {
1665            if block.round() <= 4 {
1666                store
1667                    .write(WriteBatch::default().blocks(vec![block]))
1668                    .unwrap();
1669            } else {
1670                dag_state.accept_blocks(vec![block]);
1671            }
1672        });
1673
1674        // Now when trying to query whether we have all the blocks, we should successfully retrieve a positive answer
1675        // where the blocks of first 4 round should be found in DagState and the rest in store.
1676        let mut block_refs = blocks
1677            .iter()
1678            .map(|block| block.reference())
1679            .collect::<Vec<_>>();
1680        let result = dag_state.contains_blocks(block_refs.clone());
1681
1682        // Ensure everything is found
1683        let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1684        assert_eq!(result, expected);
1685
1686        // Now try to ask also for one block ref that is neither in cache nor in store
1687        block_refs.insert(
1688            3,
1689            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1690        );
1691        let result = dag_state.contains_blocks(block_refs.clone());
1692
1693        // Then all should be found apart from the last one
1694        expected.insert(3, false);
1695        assert_eq!(result, expected.clone());
1696    }
1697
1698    #[tokio::test]
1699    async fn test_contains_cached_block_at_slot() {
1700        /// Only keep elements up to 2 rounds before the last committed round
1701        const CACHED_ROUNDS: Round = 2;
1702
1703        let num_authorities: u32 = 4;
1704        let (mut context, _) = Context::new_for_test(num_authorities as usize);
1705        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1706
1707        let context = Arc::new(context);
1708        let store = Arc::new(MemStore::new());
1709        let mut dag_state = DagState::new(context.clone(), store.clone());
1710
1711        // Create test blocks for round 1 ~ 10
1712        let num_rounds: u32 = 10;
1713        let mut blocks = Vec::new();
1714
1715        for round in 1..=num_rounds {
1716            for author in 0..num_authorities {
1717                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1718                blocks.push(block.clone());
1719                dag_state.accept_block(block);
1720            }
1721        }
1722
1723        // Query for genesis round 0, genesis blocks should be returned
1724        for (author, _) in context.committee.authorities() {
1725            assert!(
1726                dag_state.contains_cached_block_at_slot(Slot::new(GENESIS_ROUND, author)),
1727                "Genesis should always be found"
1728            );
1729        }
1730
1731        // Now when trying to query whether we have all the blocks, we should successfully retrieve a positive answer
1732        // where the blocks of first 4 round should be found in DagState and the rest in store.
1733        let mut block_refs = blocks
1734            .iter()
1735            .map(|block| block.reference())
1736            .collect::<Vec<_>>();
1737
1738        for block_ref in block_refs.clone() {
1739            let slot = block_ref.into();
1740            let found = dag_state.contains_cached_block_at_slot(slot);
1741            assert!(found, "A block should be found at slot {}", slot);
1742        }
1743
1744        // Now try to ask also for one block ref that is not in cache
1745        // Then all should be found apart from the last one
1746        block_refs.insert(
1747            3,
1748            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1749        );
1750        let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1751        expected.insert(3, false);
1752
1753        // Attempt to check the same for via the contains slot method
1754        for block_ref in block_refs {
1755            let slot = block_ref.into();
1756            let found = dag_state.contains_cached_block_at_slot(slot);
1757
1758            assert_eq!(expected.remove(0), found);
1759        }
1760    }
1761
1762    #[tokio::test]
1763    #[ignore]
1764    #[should_panic(
1765        expected = "Attempted to check for slot [1]3 that is <= the last gc evicted round 3"
1766    )]
1767    async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range() {
1768        /// Keep 2 rounds from the highest committed round. This is considered universal and minimum necessary blocks to hold
1769        /// for the correct node operation.
1770        const GC_DEPTH: u32 = 2;
1771        /// Keep at least 3 rounds in cache for each authority.
1772        const CACHED_ROUNDS: Round = 3;
1773
1774        let (mut context, _) = Context::new_for_test(4);
1775        context
1776            .protocol_config
1777            .set_consensus_gc_depth_for_testing(GC_DEPTH);
1778        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1779
1780        let context = Arc::new(context);
1781        let store = Arc::new(MemStore::new());
1782        let mut dag_state = DagState::new(context.clone(), store.clone());
1783
1784        // Create for rounds 1..=6. Skip creating blocks for authority 0 for rounds 4 - 6.
1785        let mut dag_builder = DagBuilder::new(context.clone());
1786        dag_builder.layers(1..=3).build();
1787        dag_builder
1788            .layers(4..=6)
1789            .authorities(vec![AuthorityIndex::new_for_test(0)])
1790            .skip_block()
1791            .build();
1792
1793        // Accept all blocks
1794        dag_builder
1795            .all_blocks()
1796            .into_iter()
1797            .for_each(|block| dag_state.accept_block(block));
1798
1799        // Now add a commit for leader round 5 to trigger an eviction
1800        dag_state.add_commit(TrustedCommit::new_for_test(
1801            1 as CommitIndex,
1802            CommitDigest::MIN,
1803            0,
1804            dag_builder.leader_block(5).unwrap().reference(),
1805            vec![],
1806        ));
1807        // Flush the DAG state to storage.
1808        dag_state.flush();
1809
1810        // Ensure that gc round has been updated
1811        assert_eq!(dag_state.gc_round(), 3, "GC round should be 3");
1812
1813        // Now what we expect to happen is for:
1814        // * Nodes 1 - 3 should have in cache blocks from gc_round (3) and onwards.
1815        // * Node 0 should have in cache blocks from it's latest round, 3, up to round 1, which is the number of cached_rounds.
1816        for authority_index in 1..=3 {
1817            for round in 4..=6 {
1818                assert!(dag_state.contains_cached_block_at_slot(Slot::new(
1819                    round,
1820                    AuthorityIndex::new_for_test(authority_index)
1821                )));
1822            }
1823        }
1824
1825        for round in 1..=3 {
1826            assert!(
1827                dag_state.contains_cached_block_at_slot(Slot::new(
1828                    round,
1829                    AuthorityIndex::new_for_test(0)
1830                ))
1831            );
1832        }
1833
1834        // When trying to request for authority 1 at block slot 3 it should panic, as anything
1835        // that is <= 3 should be evicted
1836        let _ =
1837            dag_state.contains_cached_block_at_slot(Slot::new(3, AuthorityIndex::new_for_test(1)));
1838    }
1839
1840    #[tokio::test]
1841    async fn test_get_blocks_in_cache_or_store() {
1842        let (context, _) = Context::new_for_test(4);
1843        let context = Arc::new(context);
1844        let store = Arc::new(MemStore::new());
1845        let mut dag_state = DagState::new(context.clone(), store.clone());
1846
1847        // Create test blocks for round 1 ~ 10
1848        let num_rounds: u32 = 10;
1849        let num_authorities: u32 = 4;
1850        let mut blocks = Vec::new();
1851
1852        for round in 1..=num_rounds {
1853            for author in 0..num_authorities {
1854                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1855                blocks.push(block);
1856            }
1857        }
1858
1859        // Now write in store the blocks from first 4 rounds and the rest to the dag state
1860        blocks.clone().into_iter().for_each(|block| {
1861            if block.round() <= 4 {
1862                store
1863                    .write(WriteBatch::default().blocks(vec![block]))
1864                    .unwrap();
1865            } else {
1866                dag_state.accept_blocks(vec![block]);
1867            }
1868        });
1869
1870        // Now when trying to query whether we have all the blocks, we should successfully retrieve a positive answer
1871        // where the blocks of first 4 round should be found in DagState and the rest in store.
1872        let mut block_refs = blocks
1873            .iter()
1874            .map(|block| block.reference())
1875            .collect::<Vec<_>>();
1876        let result = dag_state.get_blocks(&block_refs);
1877
1878        let mut expected = blocks
1879            .into_iter()
1880            .map(Some)
1881            .collect::<Vec<Option<VerifiedBlock>>>();
1882
1883        // Ensure everything is found
1884        assert_eq!(result, expected.clone());
1885
1886        // Now try to ask also for one block ref that is neither in cache nor in store
1887        block_refs.insert(
1888            3,
1889            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1890        );
1891        let result = dag_state.get_blocks(&block_refs);
1892
1893        // Then all should be found apart from the last one
1894        expected.insert(3, None);
1895        assert_eq!(result, expected);
1896    }
1897
1898    #[tokio::test]
1899    async fn test_flush_and_recovery() {
1900        telemetry_subscribers::init_for_testing();
1901
1902        const GC_DEPTH: u32 = 3;
1903        const CACHED_ROUNDS: u32 = 4;
1904
1905        let num_authorities: u32 = 4;
1906        let (mut context, _) = Context::new_for_test(num_authorities as usize);
1907        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1908        context
1909            .protocol_config
1910            .set_consensus_gc_depth_for_testing(GC_DEPTH);
1911
1912        let context = Arc::new(context);
1913
1914        let store = Arc::new(MemStore::new());
1915        let mut dag_state = DagState::new(context.clone(), store.clone());
1916
1917        const NUM_ROUNDS: Round = 20;
1918        let mut dag_builder = DagBuilder::new(context.clone());
1919        dag_builder.layers(1..=5).build();
1920        dag_builder
1921            .layers(6..=8)
1922            .authorities(vec![AuthorityIndex::new_for_test(0)])
1923            .skip_block()
1924            .build();
1925        dag_builder.layers(9..=NUM_ROUNDS).build();
1926
1927        // Get all commits from the DAG builder.
1928        const LAST_COMMIT_ROUND: Round = 16;
1929        const LAST_COMMIT_INDEX: CommitIndex = 15;
1930        let commits = dag_builder
1931            .get_sub_dag_and_commits(1..=NUM_ROUNDS)
1932            .into_iter()
1933            .map(|(_subdag, commit)| commit)
1934            .take(LAST_COMMIT_INDEX as usize)
1935            .collect::<Vec<_>>();
1936        assert_eq!(commits.len(), LAST_COMMIT_INDEX as usize);
1937        assert_eq!(commits.last().unwrap().round(), LAST_COMMIT_ROUND);
1938
1939        // Add the blocks from first 11 rounds and first 8 commits to the dag state
1940        // Note that the commit of round 8 is missing because where authority 0 is the leader but produced no block.
1941        // So commit 8 has leader round 9.
1942        const PERSISTED_BLOCK_ROUNDS: u32 = 12;
1943        const NUM_PERSISTED_COMMITS: usize = 8;
1944        const LAST_PERSISTED_COMMIT_ROUND: Round = 9;
1945        const LAST_PERSISTED_COMMIT_INDEX: CommitIndex = 8;
1946        dag_state.accept_blocks(dag_builder.blocks(1..=PERSISTED_BLOCK_ROUNDS));
1947        let mut finalized_commits = vec![];
1948        for commit in commits.iter().take(NUM_PERSISTED_COMMITS).cloned() {
1949            finalized_commits.push(commit.clone());
1950            dag_state.add_commit(commit);
1951        }
1952        let last_finalized_commit = finalized_commits.last().unwrap();
1953        assert_eq!(last_finalized_commit.round(), LAST_PERSISTED_COMMIT_ROUND);
1954        assert_eq!(last_finalized_commit.index(), LAST_PERSISTED_COMMIT_INDEX);
1955
1956        // Collect finalized blocks.
1957        let finalized_blocks = finalized_commits
1958            .iter()
1959            .flat_map(|commit| commit.blocks())
1960            .collect::<BTreeSet<_>>();
1961
1962        // Flush commits from the dag state
1963        dag_state.flush();
1964
1965        // Verify the store has blocks up to round 12, and commits up to index 8.
1966        let store_blocks = store
1967            .scan_blocks_by_author(AuthorityIndex::new_for_test(1), 1)
1968            .unwrap();
1969        assert_eq!(store_blocks.last().unwrap().round(), PERSISTED_BLOCK_ROUNDS);
1970        let store_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1971        assert_eq!(store_commits.len(), NUM_PERSISTED_COMMITS);
1972        assert_eq!(
1973            store_commits.last().unwrap().index(),
1974            LAST_PERSISTED_COMMIT_INDEX
1975        );
1976        assert_eq!(
1977            store_commits.last().unwrap().round(),
1978            LAST_PERSISTED_COMMIT_ROUND
1979        );
1980
1981        // Add the rest of the blocks and commits to the dag state
1982        dag_state.accept_blocks(dag_builder.blocks(PERSISTED_BLOCK_ROUNDS + 1..=NUM_ROUNDS));
1983        for commit in commits.iter().skip(NUM_PERSISTED_COMMITS).cloned() {
1984            dag_state.add_commit(commit);
1985        }
1986
1987        // All blocks should be found in DagState.
1988        let all_blocks = dag_builder.blocks(1..=NUM_ROUNDS);
1989        let block_refs = all_blocks
1990            .iter()
1991            .map(|block| block.reference())
1992            .collect::<Vec<_>>();
1993        let result = dag_state
1994            .get_blocks(&block_refs)
1995            .into_iter()
1996            .map(|b| b.unwrap())
1997            .collect::<Vec<_>>();
1998        assert_eq!(result, all_blocks);
1999
2000        // Last commit index from DagState should now be 15
2001        assert_eq!(dag_state.last_commit_index(), LAST_COMMIT_INDEX);
2002
2003        // Destroy the dag state without flushing additional data.
2004        drop(dag_state);
2005
2006        // Recover the state from the store
2007        let dag_state = DagState::new(context.clone(), store.clone());
2008
2009        // Persisted blocks rounds should be found in DagState.
2010        let all_blocks = dag_builder.blocks(1..=PERSISTED_BLOCK_ROUNDS);
2011        let block_refs = all_blocks
2012            .iter()
2013            .map(|block| block.reference())
2014            .collect::<Vec<_>>();
2015        let result = dag_state
2016            .get_blocks(&block_refs)
2017            .into_iter()
2018            .map(|b| b.unwrap())
2019            .collect::<Vec<_>>();
2020        assert_eq!(result, all_blocks);
2021
2022        // Unpersisted blocks should not be in DagState, because they are not flushed.
2023        let missing_blocks = dag_builder.blocks(PERSISTED_BLOCK_ROUNDS + 1..=NUM_ROUNDS);
2024        let block_refs = missing_blocks
2025            .iter()
2026            .map(|block| block.reference())
2027            .collect::<Vec<_>>();
2028        let retrieved_blocks = dag_state
2029            .get_blocks(&block_refs)
2030            .into_iter()
2031            .flatten()
2032            .collect::<Vec<_>>();
2033        assert!(retrieved_blocks.is_empty());
2034
2035        // Recovered last commit index and round should be 8 and 9.
2036        assert_eq!(dag_state.last_commit_index(), LAST_PERSISTED_COMMIT_INDEX);
2037        assert_eq!(dag_state.last_commit_round(), LAST_PERSISTED_COMMIT_ROUND);
2038
2039        // The last_commit_rounds of the finalized commits should have been recovered.
2040        let expected_last_committed_rounds = vec![5, 9, 8, 8];
2041        assert_eq!(
2042            dag_state.last_committed_rounds(),
2043            expected_last_committed_rounds
2044        );
2045        // Unscored subdags will be recovered based on the flushed commits and no commit info.
2046        assert_eq!(dag_state.scoring_subdags_count(), NUM_PERSISTED_COMMITS);
2047
2048        // Ensure that cached blocks exist only for specific rounds per authority
2049        for (authority_index, _) in context.committee.authorities() {
2050            let blocks = dag_state.get_cached_blocks(authority_index, 1);
2051
2052            // Ensure that eviction rounds have been properly recovered.
2053            // For every authority, the gc round is 9 - 3 = 6, and cached round is 12-5 = 7.
2054            // So eviction round is the min which is 6.
2055            if authority_index == AuthorityIndex::new_for_test(0) {
2056                assert_eq!(blocks.len(), 4);
2057                assert_eq!(dag_state.evicted_rounds[authority_index.value()], 6);
2058                assert!(
2059                    blocks
2060                        .into_iter()
2061                        .all(|block| block.round() >= 7 && block.round() <= 12)
2062                );
2063            } else {
2064                assert_eq!(blocks.len(), 6);
2065                assert_eq!(dag_state.evicted_rounds[authority_index.value()], 6);
2066                assert!(
2067                    blocks
2068                        .into_iter()
2069                        .all(|block| block.round() >= 7 && block.round() <= 12)
2070                );
2071            }
2072        }
2073
2074        // Ensure that committed blocks from > gc_round have been correctly recovered as committed according to committed sub dags.
2075        let gc_round = dag_state.gc_round();
2076        assert_eq!(gc_round, 6);
2077        dag_state
2078            .recent_blocks
2079            .iter()
2080            .for_each(|(block_ref, block_info)| {
2081                if block_ref.round > gc_round && finalized_blocks.contains(block_ref) {
2082                    assert!(
2083                        block_info.committed,
2084                        "Block {:?} should be set as committed",
2085                        block_ref
2086                    );
2087                }
2088            });
2089
2090        // Ensure the hard linked status of blocks are recovered.
2091        // All blocks below highest accepted round, or authority 0 round 12 block, should be hard linked.
2092        // Other blocks (round 12 but not from authority 0) should not be hard linked.
2093        // This is because authority 0 blocks are considered proposed blocks.
2094        dag_state
2095            .recent_blocks
2096            .iter()
2097            .for_each(|(block_ref, block_info)| {
2098                if block_ref.round < PERSISTED_BLOCK_ROUNDS || block_ref.author.value() == 0 {
2099                    assert!(block_info.included);
2100                } else {
2101                    assert!(!block_info.included);
2102                }
2103            });
2104    }
2105
2106    #[tokio::test]
2107    async fn test_block_info_as_committed() {
2108        let num_authorities: u32 = 4;
2109        let (context, _) = Context::new_for_test(num_authorities as usize);
2110        let context = Arc::new(context);
2111
2112        let store = Arc::new(MemStore::new());
2113        let mut dag_state = DagState::new(context.clone(), store.clone());
2114
2115        // Accept a block
2116        let block = VerifiedBlock::new_for_test(
2117            TestBlock::new(1, 0)
2118                .set_timestamp_ms(1000)
2119                .set_ancestors(vec![])
2120                .build(),
2121        );
2122
2123        dag_state.accept_block(block.clone());
2124
2125        // Query is committed
2126        assert!(!dag_state.is_committed(&block.reference()));
2127
2128        // Set block as committed for first time should return true
2129        assert!(
2130            dag_state.set_committed(&block.reference()),
2131            "Block should be successfully set as committed for first time"
2132        );
2133
2134        // Now it should appear as committed
2135        assert!(dag_state.is_committed(&block.reference()));
2136
2137        // Trying to set the block as committed again, it should return false.
2138        assert!(
2139            !dag_state.set_committed(&block.reference()),
2140            "Block should not be successfully set as committed"
2141        );
2142    }
2143
2144    #[tokio::test]
2145    async fn test_get_cached_blocks() {
2146        let (mut context, _) = Context::new_for_test(4);
2147        context.parameters.dag_state_cached_rounds = 5;
2148
2149        let context = Arc::new(context);
2150        let store = Arc::new(MemStore::new());
2151        let mut dag_state = DagState::new(context.clone(), store.clone());
2152
2153        // Create no blocks for authority 0
2154        // Create one block (round 10) for authority 1
2155        // Create two blocks (rounds 10,11) for authority 2
2156        // Create three blocks (rounds 10,11,12) for authority 3
2157        let mut all_blocks = Vec::new();
2158        for author in 1..=3 {
2159            for round in 10..(10 + author) {
2160                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2161                all_blocks.push(block.clone());
2162                dag_state.accept_block(block);
2163            }
2164        }
2165
2166        // Test get_cached_blocks()
2167
2168        let cached_blocks =
2169            dag_state.get_cached_blocks(context.committee.to_authority_index(0).unwrap(), 0);
2170        assert!(cached_blocks.is_empty());
2171
2172        let cached_blocks =
2173            dag_state.get_cached_blocks(context.committee.to_authority_index(1).unwrap(), 10);
2174        assert_eq!(cached_blocks.len(), 1);
2175        assert_eq!(cached_blocks[0].round(), 10);
2176
2177        let cached_blocks =
2178            dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 10);
2179        assert_eq!(cached_blocks.len(), 2);
2180        assert_eq!(cached_blocks[0].round(), 10);
2181        assert_eq!(cached_blocks[1].round(), 11);
2182
2183        let cached_blocks =
2184            dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 11);
2185        assert_eq!(cached_blocks.len(), 1);
2186        assert_eq!(cached_blocks[0].round(), 11);
2187
2188        let cached_blocks =
2189            dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 10);
2190        assert_eq!(cached_blocks.len(), 3);
2191        assert_eq!(cached_blocks[0].round(), 10);
2192        assert_eq!(cached_blocks[1].round(), 11);
2193        assert_eq!(cached_blocks[2].round(), 12);
2194
2195        let cached_blocks =
2196            dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 12);
2197        assert_eq!(cached_blocks.len(), 1);
2198        assert_eq!(cached_blocks[0].round(), 12);
2199
2200        // Test get_cached_blocks_in_range()
2201
2202        // Start == end
2203        let cached_blocks = dag_state.get_cached_blocks_in_range(
2204            context.committee.to_authority_index(3).unwrap(),
2205            10,
2206            10,
2207            1,
2208        );
2209        assert!(cached_blocks.is_empty());
2210
2211        // Start > end
2212        let cached_blocks = dag_state.get_cached_blocks_in_range(
2213            context.committee.to_authority_index(3).unwrap(),
2214            11,
2215            10,
2216            1,
2217        );
2218        assert!(cached_blocks.is_empty());
2219
2220        // Empty result.
2221        let cached_blocks = dag_state.get_cached_blocks_in_range(
2222            context.committee.to_authority_index(0).unwrap(),
2223            9,
2224            10,
2225            1,
2226        );
2227        assert!(cached_blocks.is_empty());
2228
2229        // Single block, one round before the end.
2230        let cached_blocks = dag_state.get_cached_blocks_in_range(
2231            context.committee.to_authority_index(1).unwrap(),
2232            9,
2233            11,
2234            1,
2235        );
2236        assert_eq!(cached_blocks.len(), 1);
2237        assert_eq!(cached_blocks[0].round(), 10);
2238
2239        // Respect end round.
2240        let cached_blocks = dag_state.get_cached_blocks_in_range(
2241            context.committee.to_authority_index(2).unwrap(),
2242            9,
2243            12,
2244            5,
2245        );
2246        assert_eq!(cached_blocks.len(), 2);
2247        assert_eq!(cached_blocks[0].round(), 10);
2248        assert_eq!(cached_blocks[1].round(), 11);
2249
2250        // Respect start round.
2251        let cached_blocks = dag_state.get_cached_blocks_in_range(
2252            context.committee.to_authority_index(3).unwrap(),
2253            11,
2254            20,
2255            5,
2256        );
2257        assert_eq!(cached_blocks.len(), 2);
2258        assert_eq!(cached_blocks[0].round(), 11);
2259        assert_eq!(cached_blocks[1].round(), 12);
2260
2261        // Respect limit
2262        let cached_blocks = dag_state.get_cached_blocks_in_range(
2263            context.committee.to_authority_index(3).unwrap(),
2264            10,
2265            20,
2266            1,
2267        );
2268        assert_eq!(cached_blocks.len(), 1);
2269        assert_eq!(cached_blocks[0].round(), 10);
2270    }
2271
2272    #[tokio::test]
2273    async fn test_get_last_cached_block() {
2274        // GIVEN
2275        const CACHED_ROUNDS: Round = 2;
2276        const GC_DEPTH: u32 = 1;
2277        let (mut context, _) = Context::new_for_test(4);
2278        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2279        context
2280            .protocol_config
2281            .set_consensus_gc_depth_for_testing(GC_DEPTH);
2282
2283        let context = Arc::new(context);
2284        let store = Arc::new(MemStore::new());
2285        let mut dag_state = DagState::new(context.clone(), store.clone());
2286
2287        // Create no blocks for authority 0
2288        // Create one block (round 1) for authority 1
2289        // Create two blocks (rounds 1,2) for authority 2
2290        // Create three blocks (rounds 1,2,3) for authority 3
2291        let dag_str = "DAG {
2292            Round 0 : { 4 },
2293            Round 1 : {
2294                B -> [*],
2295                C -> [*],
2296                D -> [*],
2297            },
2298            Round 2 : {
2299                C -> [*],
2300                D -> [*],
2301            },
2302            Round 3 : {
2303                D -> [*],
2304            },
2305        }";
2306
2307        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2308
2309        // Add equivocating block for round 2 authority 3
2310        let block = VerifiedBlock::new_for_test(TestBlock::new(2, 2).build());
2311
2312        // Accept all blocks
2313        for block in dag_builder
2314            .all_blocks()
2315            .into_iter()
2316            .chain(std::iter::once(block))
2317        {
2318            dag_state.accept_block(block);
2319        }
2320
2321        dag_state.add_commit(TrustedCommit::new_for_test(
2322            1 as CommitIndex,
2323            CommitDigest::MIN,
2324            context.clock.timestamp_utc_ms(),
2325            dag_builder.leader_block(3).unwrap().reference(),
2326            vec![],
2327        ));
2328
2329        // WHEN search for the latest blocks
2330        let end_round = 4;
2331        let expected_rounds = vec![0, 1, 2, 3];
2332        let expected_excluded_and_equivocating_blocks = vec![0, 0, 1, 0];
2333        // THEN
2334        let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2335        assert_eq!(
2336            last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2337            expected_rounds
2338        );
2339        assert_eq!(
2340            last_blocks.iter().map(|b| b.1.len()).collect::<Vec<_>>(),
2341            expected_excluded_and_equivocating_blocks
2342        );
2343
2344        // THEN
2345        for (i, expected_round) in expected_rounds.iter().enumerate() {
2346            let round = dag_state
2347                .get_last_cached_block_in_range(
2348                    context.committee.to_authority_index(i).unwrap(),
2349                    0,
2350                    end_round,
2351                )
2352                .map(|b| b.round())
2353                .unwrap_or_default();
2354            assert_eq!(round, *expected_round, "Authority {i}");
2355        }
2356
2357        // WHEN starting from round 2
2358        let start_round = 2;
2359        let expected_rounds = [0, 0, 2, 3];
2360
2361        // THEN
2362        for (i, expected_round) in expected_rounds.iter().enumerate() {
2363            let round = dag_state
2364                .get_last_cached_block_in_range(
2365                    context.committee.to_authority_index(i).unwrap(),
2366                    start_round,
2367                    end_round,
2368                )
2369                .map(|b| b.round())
2370                .unwrap_or_default();
2371            assert_eq!(round, *expected_round, "Authority {i}");
2372        }
2373
2374        // WHEN we flush the DagState - after adding a commit with all the blocks, we expect this to trigger
2375        // a clean up in the internal cache. That will keep the all the blocks with rounds >= authority_commit_round - CACHED_ROUND.
2376        //
2377        // When GC is enabled then we'll keep all the blocks that are > gc_round (2) and for those who don't have blocks > gc_round, we'll keep
2378        // all their highest round blocks for CACHED_ROUNDS.
2379        dag_state.flush();
2380
2381        // AND we request before round 3
2382        let end_round = 3;
2383        let expected_rounds = vec![0, 1, 2, 2];
2384
2385        // THEN
2386        let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2387        assert_eq!(
2388            last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2389            expected_rounds
2390        );
2391
2392        // THEN
2393        for (i, expected_round) in expected_rounds.iter().enumerate() {
2394            let round = dag_state
2395                .get_last_cached_block_in_range(
2396                    context.committee.to_authority_index(i).unwrap(),
2397                    0,
2398                    end_round,
2399                )
2400                .map(|b| b.round())
2401                .unwrap_or_default();
2402            assert_eq!(round, *expected_round, "Authority {i}");
2403        }
2404    }
2405
2406    #[tokio::test]
2407    #[should_panic(
2408        expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
2409    )]
2410    async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() {
2411        // GIVEN
2412        const CACHED_ROUNDS: Round = 1;
2413        const GC_DEPTH: u32 = 1;
2414        let (mut context, _) = Context::new_for_test(4);
2415        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2416        context
2417            .protocol_config
2418            .set_consensus_gc_depth_for_testing(GC_DEPTH);
2419
2420        let context = Arc::new(context);
2421        let store = Arc::new(MemStore::new());
2422        let mut dag_state = DagState::new(context.clone(), store.clone());
2423
2424        // Create no blocks for authority 0
2425        // Create one block (round 1) for authority 1
2426        // Create two blocks (rounds 1,2) for authority 2
2427        // Create three blocks (rounds 1,2,3) for authority 3
2428        let mut dag_builder = DagBuilder::new(context.clone());
2429        dag_builder
2430            .layers(1..=1)
2431            .authorities(vec![AuthorityIndex::new_for_test(0)])
2432            .skip_block()
2433            .build();
2434        dag_builder
2435            .layers(2..=2)
2436            .authorities(vec![
2437                AuthorityIndex::new_for_test(0),
2438                AuthorityIndex::new_for_test(1),
2439            ])
2440            .skip_block()
2441            .build();
2442        dag_builder
2443            .layers(3..=3)
2444            .authorities(vec![
2445                AuthorityIndex::new_for_test(0),
2446                AuthorityIndex::new_for_test(1),
2447                AuthorityIndex::new_for_test(2),
2448            ])
2449            .skip_block()
2450            .build();
2451
2452        // Accept all blocks
2453        for block in dag_builder.all_blocks() {
2454            dag_state.accept_block(block);
2455        }
2456
2457        dag_state.add_commit(TrustedCommit::new_for_test(
2458            1 as CommitIndex,
2459            CommitDigest::MIN,
2460            0,
2461            dag_builder.leader_block(3).unwrap().reference(),
2462            vec![],
2463        ));
2464
2465        // Flush the store so we update the evict rounds
2466        dag_state.flush();
2467
2468        // THEN the method should panic, as some authorities have already evicted rounds <= round 2
2469        dag_state.get_last_cached_block_per_authority(2);
2470    }
2471
2472    #[tokio::test]
2473    async fn test_last_quorum() {
2474        // GIVEN
2475        let (context, _) = Context::new_for_test(4);
2476        let context = Arc::new(context);
2477        let store = Arc::new(MemStore::new());
2478        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2479
2480        // WHEN no blocks exist then genesis should be returned
2481        {
2482            let genesis = genesis_blocks(context.as_ref());
2483
2484            assert_eq!(dag_state.read().last_quorum(), genesis);
2485        }
2486
2487        // WHEN a fully connected DAG up to round 4 is created, then round 4 blocks should be returned as quorum
2488        {
2489            let mut dag_builder = DagBuilder::new(context.clone());
2490            dag_builder
2491                .layers(1..=4)
2492                .build()
2493                .persist_layers(dag_state.clone());
2494            let round_4_blocks: Vec<_> = dag_builder
2495                .blocks(4..=4)
2496                .into_iter()
2497                .map(|block| block.reference())
2498                .collect();
2499
2500            let last_quorum = dag_state.read().last_quorum();
2501
2502            assert_eq!(
2503                last_quorum
2504                    .into_iter()
2505                    .map(|block| block.reference())
2506                    .collect::<Vec<_>>(),
2507                round_4_blocks
2508            );
2509        }
2510
2511        // WHEN adding one more block at round 5, still round 4 should be returned as quorum
2512        {
2513            let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2514            dag_state.write().accept_block(block);
2515
2516            let round_4_blocks = dag_state.read().get_uncommitted_blocks_at_round(4);
2517
2518            let last_quorum = dag_state.read().last_quorum();
2519
2520            assert_eq!(last_quorum, round_4_blocks);
2521        }
2522    }
2523
2524    #[tokio::test]
2525    async fn test_last_block_for_authority() {
2526        // GIVEN
2527        let (context, _) = Context::new_for_test(4);
2528        let context = Arc::new(context);
2529        let store = Arc::new(MemStore::new());
2530        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2531
2532        // WHEN no blocks exist then genesis should be returned
2533        {
2534            let genesis = genesis_blocks(context.as_ref());
2535            let my_genesis = genesis
2536                .into_iter()
2537                .find(|block| block.author() == context.own_index)
2538                .unwrap();
2539
2540            assert_eq!(dag_state.read().get_last_proposed_block(), my_genesis);
2541        }
2542
2543        // WHEN adding some blocks for authorities, only the last ones should be returned
2544        {
2545            // add blocks up to round 4
2546            let mut dag_builder = DagBuilder::new(context.clone());
2547            dag_builder
2548                .layers(1..=4)
2549                .build()
2550                .persist_layers(dag_state.clone());
2551
2552            // add block 5 for authority 0
2553            let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2554            dag_state.write().accept_block(block);
2555
2556            let block = dag_state
2557                .read()
2558                .get_last_block_for_authority(AuthorityIndex::new_for_test(0));
2559            assert_eq!(block.round(), 5);
2560
2561            for (authority_index, _) in context.committee.authorities() {
2562                let block = dag_state
2563                    .read()
2564                    .get_last_block_for_authority(authority_index);
2565
2566                if authority_index.value() == 0 {
2567                    assert_eq!(block.round(), 5);
2568                } else {
2569                    assert_eq!(block.round(), 4);
2570                }
2571            }
2572        }
2573    }
2574
2575    #[tokio::test]
2576    async fn test_accept_block_not_panics_when_timestamp_is_ahead_and_median_timestamp() {
2577        // GIVEN
2578        let (context, _) = Context::new_for_test(4);
2579        let context = Arc::new(context);
2580        let store = Arc::new(MemStore::new());
2581        let mut dag_state = DagState::new(context.clone(), store.clone());
2582
2583        // Set a timestamp for the block that is ahead of the current time
2584        let block_timestamp = context.clock.timestamp_utc_ms() + 5_000;
2585
2586        let block = VerifiedBlock::new_for_test(
2587            TestBlock::new(10, 0)
2588                .set_timestamp_ms(block_timestamp)
2589                .build(),
2590        );
2591
2592        // Try to accept the block - it should not panic
2593        dag_state.accept_block(block);
2594    }
2595
2596    #[tokio::test]
2597    async fn test_last_finalized_commit() {
2598        // GIVEN
2599        let (context, _) = Context::new_for_test(4);
2600        let context = Arc::new(context);
2601        let store = Arc::new(MemStore::new());
2602        let mut dag_state = DagState::new(context.clone(), store.clone());
2603
2604        // WHEN adding a finalized commit
2605        let commit_ref = CommitRef::new(1, CommitDigest::MIN);
2606        let rejected_transactions = BTreeMap::new();
2607        dag_state.add_finalized_commit(commit_ref, rejected_transactions.clone());
2608
2609        // THEN the commit should be added to the buffer
2610        assert_eq!(dag_state.finalized_commits_to_write.len(), 1);
2611        assert_eq!(
2612            dag_state.finalized_commits_to_write[0],
2613            (commit_ref, rejected_transactions.clone())
2614        );
2615
2616        // WHEN flushing the DAG state
2617        dag_state.flush();
2618
2619        // THEN the commit and rejected transactions should be written to storage
2620        let last_finalized_commit = store.read_last_finalized_commit().unwrap();
2621        assert_eq!(last_finalized_commit, Some(commit_ref));
2622        let stored_rejected_transactions = store
2623            .read_rejected_transactions(commit_ref)
2624            .unwrap()
2625            .unwrap();
2626        assert_eq!(stored_rejected_transactions, rejected_transactions);
2627    }
2628}