consensus_core/
dag_state.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    cmp::max,
6    collections::{BTreeMap, BTreeSet, VecDeque},
7    ops::Bound::{Excluded, Included, Unbounded},
8    panic,
9    sync::Arc,
10    time::Duration,
11    vec,
12};
13
14use consensus_config::AuthorityIndex;
15use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs, Round, TransactionIndex};
16use itertools::Itertools as _;
17use tokio::time::Instant;
18use tracing::{debug, error, info, trace};
19
20use crate::{
21    CommittedSubDag,
22    block::{BlockAPI, GENESIS_ROUND, Slot, VerifiedBlock, genesis_blocks},
23    commit::{
24        CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRef, CommitVote,
25        GENESIS_COMMIT_INDEX, TrustedCommit, load_committed_subdag_from_store,
26    },
27    context::Context,
28    leader_scoring::{ReputationScores, ScoringSubdag},
29    storage::{Store, WriteBatch},
30    threshold_clock::ThresholdClock,
31};
32
33/// DagState provides the API to write and read accepted blocks from the DAG.
34/// Only uncommitted and last committed blocks are cached in memory.
35/// The rest of blocks are stored on disk.
36/// Refs to cached blocks and additional refs are cached as well, to speed up existence checks.
37///
38/// Note: DagState should be wrapped with Arc<parking_lot::RwLock<_>>, to allow
39/// concurrent access from multiple components.
40pub struct DagState {
41    context: Arc<Context>,
42
43    // The genesis blocks
44    genesis: BTreeMap<BlockRef, VerifiedBlock>,
45
46    // Contains recent blocks within CACHED_ROUNDS from the last committed round per authority.
47    // Note: all uncommitted blocks are kept in memory.
48    //
49    // When GC is enabled, this map has a different semantic. It holds all the recent data for each authority making sure that it always have available
50    // CACHED_ROUNDS worth of data. The entries are evicted based on the latest GC round, however the eviction process will respect the CACHED_ROUNDS.
51    // For each authority, blocks are only evicted when their round is less than or equal to both `gc_round`, and `highest authority round - cached rounds`.
52    // This ensures that the GC requirements are respected (we never clean up any block above `gc_round`), and there are enough blocks cached.
53    recent_blocks: BTreeMap<BlockRef, BlockInfo>,
54
55    // Indexes recent block refs by their authorities.
56    // Vec position corresponds to the authority index.
57    recent_refs_by_authority: Vec<BTreeSet<BlockRef>>,
58
59    // Keeps track of the threshold clock for proposing blocks.
60    threshold_clock: ThresholdClock,
61
62    // Keeps track of the highest round that has been evicted for each authority. Any blocks that are of round <= evict_round
63    // should be considered evicted, and if any exist we should not consider the causauly complete in the order they appear.
64    // The `evicted_rounds` size should be the same as the committee size.
65    evicted_rounds: Vec<Round>,
66
67    // Highest round of blocks accepted.
68    highest_accepted_round: Round,
69
70    // Last consensus commit of the dag.
71    last_commit: Option<TrustedCommit>,
72
73    // Last wall time when commit round advanced. Does not persist across restarts.
74    last_commit_round_advancement_time: Option<std::time::Instant>,
75
76    // Last committed rounds per authority.
77    last_committed_rounds: Vec<Round>,
78
79    /// The committed subdags that have been scored but scores have not been used
80    /// for leader schedule yet.
81    scoring_subdag: ScoringSubdag,
82
83    // Commit votes pending to be included in new blocks.
84    // TODO: limit to 1st commit per round with multi-leader.
85    // TODO: recover unproposed pending commit votes at startup.
86    pending_commit_votes: VecDeque<CommitVote>,
87
88    // Blocks and commits must be buffered for persistence before they can be
89    // inserted into the local DAG or sent to output.
90    blocks_to_write: Vec<VerifiedBlock>,
91    commits_to_write: Vec<TrustedCommit>,
92
93    // Buffers the reputation scores & last_committed_rounds to be flushed with the
94    // next dag state flush. Not writing eagerly is okay because we can recover reputation scores
95    // & last_committed_rounds from the commits as needed.
96    commit_info_to_write: Vec<(CommitRef, CommitInfo)>,
97
98    // Buffers finalized commits and their rejected transactions to be written to storage.
99    finalized_commits_to_write: Vec<(CommitRef, BTreeMap<BlockRef, Vec<TransactionIndex>>)>,
100
101    // Persistent storage for blocks, commits and other consensus data.
102    store: Arc<dyn Store>,
103
104    // The number of cached rounds
105    cached_rounds: Round,
106}
107
108impl DagState {
109    /// Initializes DagState from storage.
110    pub fn new(context: Arc<Context>, store: Arc<dyn Store>) -> Self {
111        let cached_rounds = context.parameters.dag_state_cached_rounds as Round;
112        let num_authorities = context.committee.size();
113
114        let genesis = genesis_blocks(context.as_ref())
115            .into_iter()
116            .map(|block| (block.reference(), block))
117            .collect();
118
119        let threshold_clock = ThresholdClock::new(1, context.clone());
120
121        let last_commit = store
122            .read_last_commit()
123            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
124
125        let commit_info = store
126            .read_last_commit_info()
127            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
128        let (mut last_committed_rounds, commit_recovery_start_index) =
129            if let Some((commit_ref, commit_info)) = commit_info {
130                tracing::info!("Recovering committed state from {commit_ref} {commit_info:?}");
131                (commit_info.committed_rounds, commit_ref.index + 1)
132            } else {
133                tracing::info!("Found no stored CommitInfo to recover from");
134                (vec![0; num_authorities], GENESIS_COMMIT_INDEX + 1)
135            };
136
137        let mut unscored_committed_subdags = Vec::new();
138        let mut scoring_subdag = ScoringSubdag::new(context.clone());
139
140        if let Some(last_commit) = last_commit.as_ref() {
141            store
142                .scan_commits((commit_recovery_start_index..=last_commit.index()).into())
143                .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
144                .iter()
145                .for_each(|commit| {
146                    for block_ref in commit.blocks() {
147                        last_committed_rounds[block_ref.author] =
148                            max(last_committed_rounds[block_ref.author], block_ref.round);
149                    }
150                    let committed_subdag =
151                        load_committed_subdag_from_store(store.as_ref(), commit.clone(), vec![]);
152                    // We don't need to recover reputation scores for unscored_committed_subdags
153                    unscored_committed_subdags.push(committed_subdag);
154                });
155        }
156
157        tracing::info!(
158            "DagState was initialized with the following state: \
159            {last_commit:?}; {last_committed_rounds:?}; {} unscored committed subdags;",
160            unscored_committed_subdags.len()
161        );
162
163        scoring_subdag.add_subdags(std::mem::take(&mut unscored_committed_subdags));
164
165        let mut state = Self {
166            context: context.clone(),
167            genesis,
168            recent_blocks: BTreeMap::new(),
169            recent_refs_by_authority: vec![BTreeSet::new(); num_authorities],
170            threshold_clock,
171            highest_accepted_round: 0,
172            last_commit: last_commit.clone(),
173            last_commit_round_advancement_time: None,
174            last_committed_rounds: last_committed_rounds.clone(),
175            pending_commit_votes: VecDeque::new(),
176            blocks_to_write: vec![],
177            commits_to_write: vec![],
178            commit_info_to_write: vec![],
179            finalized_commits_to_write: vec![],
180            scoring_subdag,
181            store: store.clone(),
182            cached_rounds,
183            evicted_rounds: vec![0; num_authorities],
184        };
185
186        for (authority_index, _) in context.committee.authorities() {
187            let (blocks, eviction_round) = {
188                // Find the latest block for the authority to calculate the eviction round. Then we want to scan and load the blocks from the eviction round and onwards only.
189                // As reminder, the eviction round is taking into account the gc_round.
190                let last_block = state
191                    .store
192                    .scan_last_blocks_by_author(authority_index, 1, None)
193                    .expect("Database error");
194                let last_block_round = last_block
195                    .last()
196                    .map(|b| b.round())
197                    .unwrap_or(GENESIS_ROUND);
198
199                let eviction_round =
200                    Self::eviction_round(last_block_round, state.gc_round(), state.cached_rounds);
201                let blocks = state
202                    .store
203                    .scan_blocks_by_author(authority_index, eviction_round + 1)
204                    .expect("Database error");
205
206                (blocks, eviction_round)
207            };
208
209            state.evicted_rounds[authority_index] = eviction_round;
210
211            // Update the block metadata for the authority.
212            for block in &blocks {
213                state.update_block_metadata(block);
214            }
215
216            debug!(
217                "Recovered blocks {}: {:?}",
218                authority_index,
219                blocks
220                    .iter()
221                    .map(|b| b.reference())
222                    .collect::<Vec<BlockRef>>()
223            );
224        }
225
226        if let Some(last_commit) = last_commit {
227            let mut index = last_commit.index();
228            let gc_round = state.gc_round();
229            info!(
230                "Recovering block commit statuses from commit index {} and backwards until leader of round <= gc_round {:?}",
231                index, gc_round
232            );
233
234            loop {
235                let commits = store
236                    .scan_commits((index..=index).into())
237                    .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
238                let Some(commit) = commits.first() else {
239                    info!("Recovering finished up to index {index}, no more commits to recover");
240                    break;
241                };
242
243                // Check the commit leader round to see if it is within the gc_round. If it is not then we can stop the recovery process.
244                if gc_round > 0 && commit.leader().round <= gc_round {
245                    info!(
246                        "Recovering finished, reached commit leader round {} <= gc_round {}",
247                        commit.leader().round,
248                        gc_round
249                    );
250                    break;
251                }
252
253                commit.blocks().iter().filter(|b| b.round > gc_round).for_each(|block_ref|{
254                    debug!(
255                        "Setting block {:?} as committed based on commit {:?}",
256                        block_ref,
257                        commit.index()
258                    );
259                    assert!(state.set_committed(block_ref), "Attempted to set again a block {:?} as committed when recovering commit {:?}", block_ref, commit);
260                });
261
262                // All commits are indexed starting from 1, so one reach zero exit.
263                index = index.saturating_sub(1);
264                if index == 0 {
265                    break;
266                }
267            }
268        }
269
270        // Recover hard linked statuses for blocks within GC round.
271        let proposed_blocks = store
272            .scan_blocks_by_author(context.own_index, state.gc_round() + 1)
273            .expect("Database error");
274        for block in proposed_blocks {
275            state.link_causal_history(block.reference());
276        }
277
278        state
279    }
280
281    /// Accepts a block into DagState and keeps it in memory.
282    pub(crate) fn accept_block(&mut self, block: VerifiedBlock) {
283        assert_ne!(
284            block.round(),
285            0,
286            "Genesis block should not be accepted into DAG."
287        );
288
289        let block_ref = block.reference();
290        if self.contains_block(&block_ref) {
291            return;
292        }
293
294        let now = self.context.clock.timestamp_utc_ms();
295        if block.timestamp_ms() > now {
296            trace!(
297                "Block {:?} with timestamp {} is greater than local timestamp {}.",
298                block,
299                block.timestamp_ms(),
300                now,
301            );
302        }
303        let hostname = &self.context.committee.authority(block_ref.author).hostname;
304        self.context
305            .metrics
306            .node_metrics
307            .accepted_block_time_drift_ms
308            .with_label_values(&[hostname])
309            .inc_by(block.timestamp_ms().saturating_sub(now));
310
311        // TODO: Move this check to core
312        // Ensure we don't write multiple blocks per slot for our own index
313        if block_ref.author == self.context.own_index {
314            let existing_blocks = self.get_uncommitted_blocks_at_slot(block_ref.into());
315            assert!(
316                existing_blocks.is_empty(),
317                "Block Rejected! Attempted to add block {block:#?} to own slot where \
318                block(s) {existing_blocks:#?} already exists."
319            );
320        }
321        self.update_block_metadata(&block);
322        self.blocks_to_write.push(block);
323        let source = if self.context.own_index == block_ref.author {
324            "own"
325        } else {
326            "others"
327        };
328        self.context
329            .metrics
330            .node_metrics
331            .accepted_blocks
332            .with_label_values(&[source])
333            .inc();
334    }
335
336    /// Updates internal metadata for a block.
337    fn update_block_metadata(&mut self, block: &VerifiedBlock) {
338        let block_ref = block.reference();
339        self.recent_blocks
340            .insert(block_ref, BlockInfo::new(block.clone()));
341        self.recent_refs_by_authority[block_ref.author].insert(block_ref);
342
343        if self.threshold_clock.add_block(block_ref) {
344            // Do not measure quorum delay when no local block is proposed in the round.
345            let last_proposed_block = self.get_last_proposed_block();
346            if last_proposed_block.round() == block_ref.round {
347                let quorum_delay_ms = self
348                    .context
349                    .clock
350                    .timestamp_utc_ms()
351                    .saturating_sub(self.get_last_proposed_block().timestamp_ms());
352                self.context
353                    .metrics
354                    .node_metrics
355                    .quorum_receive_latency
356                    .observe(Duration::from_millis(quorum_delay_ms).as_secs_f64());
357            }
358        }
359
360        self.highest_accepted_round = max(self.highest_accepted_round, block.round());
361        self.context
362            .metrics
363            .node_metrics
364            .highest_accepted_round
365            .set(self.highest_accepted_round as i64);
366
367        let highest_accepted_round_for_author = self.recent_refs_by_authority[block_ref.author]
368            .last()
369            .map(|block_ref| block_ref.round)
370            .expect("There should be by now at least one block ref");
371        let hostname = &self.context.committee.authority(block_ref.author).hostname;
372        self.context
373            .metrics
374            .node_metrics
375            .highest_accepted_authority_round
376            .with_label_values(&[hostname])
377            .set(highest_accepted_round_for_author as i64);
378    }
379
380    /// Accepts a blocks into DagState and keeps it in memory.
381    pub(crate) fn accept_blocks(&mut self, blocks: Vec<VerifiedBlock>) {
382        debug!(
383            "Accepting blocks: {}",
384            blocks.iter().map(|b| b.reference().to_string()).join(",")
385        );
386        for block in blocks {
387            self.accept_block(block);
388        }
389    }
390
391    /// Gets a block by checking cached recent blocks then storage.
392    /// Returns None when the block is not found.
393    pub(crate) fn get_block(&self, reference: &BlockRef) -> Option<VerifiedBlock> {
394        self.get_blocks(&[*reference])
395            .pop()
396            .expect("Exactly one element should be returned")
397    }
398
399    /// Gets blocks by checking genesis, cached recent blocks in memory, then storage.
400    /// An element is None when the corresponding block is not found.
401    pub(crate) fn get_blocks(&self, block_refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
402        let mut blocks = vec![None; block_refs.len()];
403        let mut missing = Vec::new();
404
405        for (index, block_ref) in block_refs.iter().enumerate() {
406            if block_ref.round == GENESIS_ROUND {
407                // Allow the caller to handle the invalid genesis ancestor error.
408                if let Some(block) = self.genesis.get(block_ref) {
409                    blocks[index] = Some(block.clone());
410                }
411                continue;
412            }
413            if let Some(block_info) = self.recent_blocks.get(block_ref) {
414                blocks[index] = Some(block_info.block.clone());
415                continue;
416            }
417            missing.push((index, block_ref));
418        }
419
420        if missing.is_empty() {
421            return blocks;
422        }
423
424        let missing_refs = missing
425            .iter()
426            .map(|(_, block_ref)| **block_ref)
427            .collect::<Vec<_>>();
428        let store_results = self
429            .store
430            .read_blocks(&missing_refs)
431            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
432        self.context
433            .metrics
434            .node_metrics
435            .dag_state_store_read_count
436            .with_label_values(&["get_blocks"])
437            .inc();
438
439        for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
440            blocks[index] = result;
441        }
442
443        blocks
444    }
445
446    /// Gets all uncommitted blocks in a slot.
447    /// Uncommitted blocks must exist in memory, so only in-memory blocks are checked.
448    pub(crate) fn get_uncommitted_blocks_at_slot(&self, slot: Slot) -> Vec<VerifiedBlock> {
449        // TODO: either panic below when the slot is at or below the last committed round,
450        // or support reading from storage while limiting storage reads to edge cases.
451
452        let mut blocks = vec![];
453        for (_block_ref, block_info) in self.recent_blocks.range((
454            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
455            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
456        )) {
457            blocks.push(block_info.block.clone())
458        }
459        blocks
460    }
461
462    /// Gets all uncommitted blocks in a round.
463    /// Uncommitted blocks must exist in memory, so only in-memory blocks are checked.
464    pub(crate) fn get_uncommitted_blocks_at_round(&self, round: Round) -> Vec<VerifiedBlock> {
465        if round <= self.last_commit_round() {
466            panic!("Round {} have committed blocks!", round);
467        }
468
469        let mut blocks = vec![];
470        for (_block_ref, block_info) in self.recent_blocks.range((
471            Included(BlockRef::new(round, AuthorityIndex::ZERO, BlockDigest::MIN)),
472            Excluded(BlockRef::new(
473                round + 1,
474                AuthorityIndex::ZERO,
475                BlockDigest::MIN,
476            )),
477        )) {
478            blocks.push(block_info.block.clone())
479        }
480        blocks
481    }
482
483    /// Gets all ancestors in the history of a block at a certain round.
484    pub(crate) fn ancestors_at_round(
485        &self,
486        later_block: &VerifiedBlock,
487        earlier_round: Round,
488    ) -> Vec<VerifiedBlock> {
489        // Iterate through ancestors of later_block in round descending order.
490        let mut linked: BTreeSet<BlockRef> = later_block.ancestors().iter().cloned().collect();
491        while !linked.is_empty() {
492            let round = linked.last().unwrap().round;
493            // Stop after finishing traversal for ancestors above earlier_round.
494            if round <= earlier_round {
495                break;
496            }
497            let block_ref = linked.pop_last().unwrap();
498            let Some(block) = self.get_block(&block_ref) else {
499                panic!("Block {:?} should exist in DAG!", block_ref);
500            };
501            linked.extend(block.ancestors().iter().cloned());
502        }
503        linked
504            .range((
505                Included(BlockRef::new(
506                    earlier_round,
507                    AuthorityIndex::ZERO,
508                    BlockDigest::MIN,
509                )),
510                Unbounded,
511            ))
512            .map(|r| {
513                self.get_block(r)
514                    .unwrap_or_else(|| panic!("Block {:?} should exist in DAG!", r))
515                    .clone()
516            })
517            .collect()
518    }
519
520    /// Gets the last proposed block from this authority.
521    /// If no block is proposed yet, returns the genesis block.
522    pub(crate) fn get_last_proposed_block(&self) -> VerifiedBlock {
523        self.get_last_block_for_authority(self.context.own_index)
524    }
525
526    /// Retrieves the last accepted block from the specified `authority`. If no block is found in cache
527    /// then the genesis block is returned as no other block has been received from that authority.
528    pub(crate) fn get_last_block_for_authority(&self, authority: AuthorityIndex) -> VerifiedBlock {
529        if let Some(last) = self.recent_refs_by_authority[authority].last() {
530            return self
531                .recent_blocks
532                .get(last)
533                .expect("Block should be found in recent blocks")
534                .block
535                .clone();
536        }
537
538        // if none exists, then fallback to genesis
539        let (_, genesis_block) = self
540            .genesis
541            .iter()
542            .find(|(block_ref, _)| block_ref.author == authority)
543            .expect("Genesis should be found for authority {authority_index}");
544        genesis_block.clone()
545    }
546
547    /// Returns cached recent blocks from the specified authority.
548    /// Blocks returned are limited to round >= `start`, and cached.
549    /// NOTE: caller should not assume returned blocks are always chained.
550    /// "Disconnected" blocks can be returned when there are byzantine blocks,
551    /// or a previously evicted block is accepted again.
552    pub(crate) fn get_cached_blocks(
553        &self,
554        authority: AuthorityIndex,
555        start: Round,
556    ) -> Vec<VerifiedBlock> {
557        self.get_cached_blocks_in_range(authority, start, Round::MAX, usize::MAX)
558    }
559
560    // Retrieves the cached block within the range [start_round, end_round) from a given authority,
561    // limited in total number of blocks.
562    pub(crate) fn get_cached_blocks_in_range(
563        &self,
564        authority: AuthorityIndex,
565        start_round: Round,
566        end_round: Round,
567        limit: usize,
568    ) -> Vec<VerifiedBlock> {
569        if start_round >= end_round || limit == 0 {
570            return vec![];
571        }
572
573        let mut blocks = vec![];
574        for block_ref in self.recent_refs_by_authority[authority].range((
575            Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
576            Excluded(BlockRef::new(
577                end_round,
578                AuthorityIndex::MIN,
579                BlockDigest::MIN,
580            )),
581        )) {
582            let block_info = self
583                .recent_blocks
584                .get(block_ref)
585                .expect("Block should exist in recent blocks");
586            blocks.push(block_info.block.clone());
587            if blocks.len() >= limit {
588                break;
589            }
590        }
591        blocks
592    }
593
594    // Retrieves the last cached block within the range [start_round, end_round) from a given authority.
595    pub(crate) fn get_last_cached_block_in_range(
596        &self,
597        authority: AuthorityIndex,
598        start_round: Round,
599        end_round: Round,
600    ) -> Option<VerifiedBlock> {
601        if start_round >= end_round {
602            return None;
603        }
604
605        let block_ref = self.recent_refs_by_authority[authority]
606            .range((
607                Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
608                Excluded(BlockRef::new(
609                    end_round,
610                    AuthorityIndex::MIN,
611                    BlockDigest::MIN,
612                )),
613            ))
614            .last()?;
615
616        self.recent_blocks
617            .get(block_ref)
618            .map(|block_info| block_info.block.clone())
619    }
620
621    /// Returns the last block proposed per authority with `evicted round < round < end_round`.
622    /// The method is guaranteed to return results only when the `end_round` is not earlier of the
623    /// available cached data for each authority (evicted round + 1), otherwise the method will panic.
624    /// It's the caller's responsibility to ensure that is not requesting for earlier rounds.
625    /// In case of equivocation for an authority's last slot, one block will be returned (the last in order)
626    /// and the other equivocating blocks will be returned.
627    pub(crate) fn get_last_cached_block_per_authority(
628        &self,
629        end_round: Round,
630    ) -> Vec<(VerifiedBlock, Vec<BlockRef>)> {
631        // Initialize with the genesis blocks as fallback
632        let mut blocks = self.genesis.values().cloned().collect::<Vec<_>>();
633        let mut equivocating_blocks = vec![vec![]; self.context.committee.size()];
634
635        if end_round == GENESIS_ROUND {
636            panic!(
637                "Attempted to retrieve blocks earlier than the genesis round which is not possible"
638            );
639        }
640
641        if end_round == GENESIS_ROUND + 1 {
642            return blocks.into_iter().map(|b| (b, vec![])).collect();
643        }
644
645        for (authority_index, block_refs) in self.recent_refs_by_authority.iter().enumerate() {
646            let authority_index = self
647                .context
648                .committee
649                .to_authority_index(authority_index)
650                .unwrap();
651
652            let last_evicted_round = self.evicted_rounds[authority_index];
653            if end_round.saturating_sub(1) <= last_evicted_round {
654                panic!(
655                    "Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}",
656                );
657            }
658
659            let block_ref_iter = block_refs
660                .range((
661                    Included(BlockRef::new(
662                        last_evicted_round + 1,
663                        authority_index,
664                        BlockDigest::MIN,
665                    )),
666                    Excluded(BlockRef::new(end_round, authority_index, BlockDigest::MIN)),
667                ))
668                .rev();
669
670            let mut last_round = 0;
671            for block_ref in block_ref_iter {
672                if last_round == 0 {
673                    last_round = block_ref.round;
674                    let block_info = self
675                        .recent_blocks
676                        .get(block_ref)
677                        .expect("Block should exist in recent blocks");
678                    blocks[authority_index] = block_info.block.clone();
679                    continue;
680                }
681                if block_ref.round < last_round {
682                    break;
683                }
684                equivocating_blocks[authority_index].push(*block_ref);
685            }
686        }
687
688        blocks.into_iter().zip(equivocating_blocks).collect()
689    }
690
691    /// Checks whether a block exists in the slot. The method checks only against the cached data.
692    /// If the user asks for a slot that is not within the cached data then a panic is thrown.
693    pub(crate) fn contains_cached_block_at_slot(&self, slot: Slot) -> bool {
694        // Always return true for genesis slots.
695        if slot.round == GENESIS_ROUND {
696            return true;
697        }
698
699        let eviction_round = self.evicted_rounds[slot.authority];
700        if slot.round <= eviction_round {
701            panic!(
702                "{}",
703                format!(
704                    "Attempted to check for slot {slot} that is <= the last evicted round {eviction_round}"
705                )
706            );
707        }
708
709        let mut result = self.recent_refs_by_authority[slot.authority].range((
710            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
711            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
712        ));
713        result.next().is_some()
714    }
715
716    /// Checks whether the required blocks are in cache, if exist, or otherwise will check in store. The method is not caching
717    /// back the results, so its expensive if keep asking for cache missing blocks.
718    pub(crate) fn contains_blocks(&self, block_refs: Vec<BlockRef>) -> Vec<bool> {
719        let mut exist = vec![false; block_refs.len()];
720        let mut missing = Vec::new();
721
722        for (index, block_ref) in block_refs.into_iter().enumerate() {
723            let recent_refs = &self.recent_refs_by_authority[block_ref.author];
724            if recent_refs.contains(&block_ref) || self.genesis.contains_key(&block_ref) {
725                exist[index] = true;
726            } else if recent_refs.is_empty() || recent_refs.last().unwrap().round < block_ref.round
727            {
728                // Optimization: recent_refs contain the most recent blocks known to this authority.
729                // If a block ref is not found there and has a higher round, it definitely is
730                // missing from this authority and there is no need to check disk.
731                exist[index] = false;
732            } else {
733                missing.push((index, block_ref));
734            }
735        }
736
737        if missing.is_empty() {
738            return exist;
739        }
740
741        let missing_refs = missing
742            .iter()
743            .map(|(_, block_ref)| *block_ref)
744            .collect::<Vec<_>>();
745        let store_results = self
746            .store
747            .contains_blocks(&missing_refs)
748            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
749        self.context
750            .metrics
751            .node_metrics
752            .dag_state_store_read_count
753            .with_label_values(&["contains_blocks"])
754            .inc();
755
756        for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
757            exist[index] = result;
758        }
759
760        exist
761    }
762
763    pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool {
764        let blocks = self.contains_blocks(vec![*block_ref]);
765        blocks.first().cloned().unwrap()
766    }
767
768    // Sets the block as committed in the cache. If the block is set as committed for first time, then true is returned, otherwise false is returned instead.
769    // Method will panic if the block is not found in the cache.
770    pub(crate) fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
771        if let Some(block_info) = self.recent_blocks.get_mut(block_ref) {
772            if !block_info.committed {
773                block_info.committed = true;
774                return true;
775            }
776            false
777        } else {
778            panic!(
779                "Block {:?} not found in cache to set as committed.",
780                block_ref
781            );
782        }
783    }
784
785    /// Returns true if the block is committed. Only valid for blocks above the GC round.
786    pub(crate) fn is_committed(&self, block_ref: &BlockRef) -> bool {
787        self.recent_blocks
788            .get(block_ref)
789            .unwrap_or_else(|| panic!("Attempted to query for commit status for a block not in cached data {block_ref}"))
790            .committed
791    }
792
793    /// Recursively sets blocks in the causal history of the root block as hard linked, including the root block itself.
794    /// Returns the list of blocks that are newly linked.
795    /// The returned blocks are guaranteed to be above the GC round.
796    /// Transaction votes for the returned blocks are retrieved and carried by the upcoming
797    /// proposed block.
798    pub(crate) fn link_causal_history(&mut self, root_block: BlockRef) -> Vec<BlockRef> {
799        let gc_round = self.gc_round();
800        let mut linked_blocks = vec![];
801        let mut targets = VecDeque::new();
802        targets.push_back(root_block);
803        while let Some(block_ref) = targets.pop_front() {
804            // No need to collect or mark blocks at or below GC round.
805            // These blocks and their causal history will not be included in new commits.
806            // And their transactions do not need votes to finalize or skip.
807            //
808            // CommitFinalizer::gced_transaction_votes_for_pending_block() is the counterpart
809            // to this logic, when deciding if block A in the causal history of block B gets
810            // implicit accept transaction votes from block B.
811            if block_ref.round <= gc_round {
812                continue;
813            }
814            let block_info = self
815                .recent_blocks
816                .get_mut(&block_ref)
817                .unwrap_or_else(|| panic!("Block {:?} is not in DAG state", block_ref));
818            if block_info.included {
819                continue;
820            }
821            linked_blocks.push(block_ref);
822            block_info.included = true;
823            targets.extend(block_info.block.ancestors().iter());
824        }
825        linked_blocks
826    }
827
828    /// Returns true if the block has been included in an owned proposed block.
829    /// NOTE: caller should make sure only blocks above GC round are queried.
830    pub(crate) fn has_been_included(&self, block_ref: &BlockRef) -> bool {
831        self.recent_blocks
832            .get(block_ref)
833            .unwrap_or_else(|| {
834                panic!(
835                    "Attempted to query for inclusion status for a block not in cached data {}",
836                    block_ref
837                )
838            })
839            .included
840    }
841
842    pub(crate) fn threshold_clock_round(&self) -> Round {
843        self.threshold_clock.get_round()
844    }
845
846    // The timestamp of when quorum threshold was last reached in the threshold clock.
847    pub(crate) fn threshold_clock_quorum_ts(&self) -> Instant {
848        self.threshold_clock.get_quorum_ts()
849    }
850
851    pub(crate) fn highest_accepted_round(&self) -> Round {
852        self.highest_accepted_round
853    }
854
855    // Buffers a new commit in memory and updates last committed rounds.
856    // REQUIRED: must not skip over any commit index.
857    pub(crate) fn add_commit(&mut self, commit: TrustedCommit) {
858        let time_diff = if let Some(last_commit) = &self.last_commit {
859            if commit.index() <= last_commit.index() {
860                error!(
861                    "New commit index {} <= last commit index {}!",
862                    commit.index(),
863                    last_commit.index()
864                );
865                return;
866            }
867            assert_eq!(commit.index(), last_commit.index() + 1);
868
869            if commit.timestamp_ms() < last_commit.timestamp_ms() {
870                panic!(
871                    "Commit timestamps do not monotonically increment, prev commit {:?}, new commit {:?}",
872                    last_commit, commit
873                );
874            }
875            commit
876                .timestamp_ms()
877                .saturating_sub(last_commit.timestamp_ms())
878        } else {
879            assert_eq!(commit.index(), 1);
880            0
881        };
882
883        self.context
884            .metrics
885            .node_metrics
886            .last_commit_time_diff
887            .observe(time_diff as f64);
888
889        let commit_round_advanced = if let Some(previous_commit) = &self.last_commit {
890            previous_commit.round() < commit.round()
891        } else {
892            true
893        };
894
895        self.last_commit = Some(commit.clone());
896
897        if commit_round_advanced {
898            let now = std::time::Instant::now();
899            if let Some(previous_time) = self.last_commit_round_advancement_time {
900                self.context
901                    .metrics
902                    .node_metrics
903                    .commit_round_advancement_interval
904                    .observe(now.duration_since(previous_time).as_secs_f64())
905            }
906            self.last_commit_round_advancement_time = Some(now);
907        }
908
909        for block_ref in commit.blocks().iter() {
910            self.last_committed_rounds[block_ref.author] = max(
911                self.last_committed_rounds[block_ref.author],
912                block_ref.round,
913            );
914        }
915
916        for (i, round) in self.last_committed_rounds.iter().enumerate() {
917            let index = self.context.committee.to_authority_index(i).unwrap();
918            let hostname = &self.context.committee.authority(index).hostname;
919            self.context
920                .metrics
921                .node_metrics
922                .last_committed_authority_round
923                .with_label_values(&[hostname])
924                .set((*round).into());
925        }
926
927        self.pending_commit_votes.push_back(commit.reference());
928        self.commits_to_write.push(commit);
929    }
930
931    /// Recovers commits to write from storage, at startup.
932    pub(crate) fn recover_commits_to_write(&mut self, commits: Vec<TrustedCommit>) {
933        self.commits_to_write.extend(commits);
934    }
935
936    pub(crate) fn ensure_commits_to_write_is_empty(&self) {
937        assert!(
938            self.commits_to_write.is_empty(),
939            "Commits to write should be empty. {:?}",
940            self.commits_to_write,
941        );
942    }
943
944    pub(crate) fn add_commit_info(&mut self, reputation_scores: ReputationScores) {
945        // We create an empty scoring subdag once reputation scores are calculated.
946        // Note: It is okay for this to not be gated by protocol config as the
947        // scoring_subdag should be empty in either case at this point.
948        assert!(self.scoring_subdag.is_empty());
949
950        let commit_info = CommitInfo {
951            committed_rounds: self.last_committed_rounds.clone(),
952            reputation_scores,
953        };
954        let last_commit = self
955            .last_commit
956            .as_ref()
957            .expect("Last commit should already be set.");
958        self.commit_info_to_write
959            .push((last_commit.reference(), commit_info));
960    }
961
962    pub(crate) fn add_finalized_commit(
963        &mut self,
964        commit_ref: CommitRef,
965        rejected_transactions: BTreeMap<BlockRef, Vec<TransactionIndex>>,
966    ) {
967        self.finalized_commits_to_write
968            .push((commit_ref, rejected_transactions));
969    }
970
971    pub(crate) fn take_commit_votes(&mut self, limit: usize) -> Vec<CommitVote> {
972        let mut votes = Vec::new();
973        while !self.pending_commit_votes.is_empty() && votes.len() < limit {
974            votes.push(self.pending_commit_votes.pop_front().unwrap());
975        }
976        votes
977    }
978
979    /// Index of the last commit.
980    pub(crate) fn last_commit_index(&self) -> CommitIndex {
981        match &self.last_commit {
982            Some(commit) => commit.index(),
983            None => 0,
984        }
985    }
986
987    /// Digest of the last commit.
988    pub(crate) fn last_commit_digest(&self) -> CommitDigest {
989        match &self.last_commit {
990            Some(commit) => commit.digest(),
991            None => CommitDigest::MIN,
992        }
993    }
994
995    /// Timestamp of the last commit.
996    pub(crate) fn last_commit_timestamp_ms(&self) -> BlockTimestampMs {
997        match &self.last_commit {
998            Some(commit) => commit.timestamp_ms(),
999            None => 0,
1000        }
1001    }
1002
1003    /// Leader slot of the last commit.
1004    pub(crate) fn last_commit_leader(&self) -> Slot {
1005        match &self.last_commit {
1006            Some(commit) => commit.leader().into(),
1007            None => self
1008                .genesis
1009                .iter()
1010                .next()
1011                .map(|(genesis_ref, _)| *genesis_ref)
1012                .expect("Genesis blocks should always be available.")
1013                .into(),
1014        }
1015    }
1016
1017    /// Highest round where a block is committed, which is last commit's leader round.
1018    pub(crate) fn last_commit_round(&self) -> Round {
1019        match &self.last_commit {
1020            Some(commit) => commit.leader().round,
1021            None => 0,
1022        }
1023    }
1024
1025    /// Last committed round per authority.
1026    pub(crate) fn last_committed_rounds(&self) -> Vec<Round> {
1027        self.last_committed_rounds.clone()
1028    }
1029
1030    /// The GC round is the highest round that blocks of equal or lower round are considered obsolete and no longer possible to be committed.
1031    /// There is no meaning accepting any blocks with round <= gc_round. The Garbage Collection (GC) round is calculated based on the latest
1032    /// committed leader round. When GC is disabled that will return the genesis round.
1033    pub(crate) fn gc_round(&self) -> Round {
1034        self.calculate_gc_round(self.last_commit_round())
1035    }
1036
1037    /// Calculates the GC round from the input leader round, which can be different
1038    /// from the last committed leader round.
1039    pub(crate) fn calculate_gc_round(&self, commit_round: Round) -> Round {
1040        commit_round.saturating_sub(self.context.protocol_config.gc_depth())
1041    }
1042
1043    /// Flushes unpersisted blocks, commits and commit info to storage.
1044    ///
1045    /// REQUIRED: when buffering a block, all of its ancestors and the latest commit which sets the GC round
1046    /// must also be buffered.
1047    /// REQUIRED: when buffering a commit, all of its included blocks and the previous commits must also be buffered.
1048    /// REQUIRED: when flushing, all of the buffered blocks and commits must be flushed together to ensure consistency.
1049    ///
1050    /// After each flush, DagState becomes persisted in storage and it expected to recover
1051    /// all internal states from storage after restarts.
1052    pub(crate) fn flush(&mut self) {
1053        let _s = self
1054            .context
1055            .metrics
1056            .node_metrics
1057            .scope_processing_time
1058            .with_label_values(&["DagState::flush"])
1059            .start_timer();
1060
1061        // Flush buffered data to storage.
1062        let pending_blocks = std::mem::take(&mut self.blocks_to_write);
1063        let pending_commits = std::mem::take(&mut self.commits_to_write);
1064        let pending_commit_info = std::mem::take(&mut self.commit_info_to_write);
1065        let pending_finalized_commits = std::mem::take(&mut self.finalized_commits_to_write);
1066        if pending_blocks.is_empty()
1067            && pending_commits.is_empty()
1068            && pending_commit_info.is_empty()
1069            && pending_finalized_commits.is_empty()
1070        {
1071            return;
1072        }
1073
1074        debug!(
1075            "Flushing {} blocks ({}), {} commits ({}), {} commit infos ({}), {} finalized commits ({}) to storage.",
1076            pending_blocks.len(),
1077            pending_blocks
1078                .iter()
1079                .map(|b| b.reference().to_string())
1080                .join(","),
1081            pending_commits.len(),
1082            pending_commits
1083                .iter()
1084                .map(|c| c.reference().to_string())
1085                .join(","),
1086            pending_commit_info.len(),
1087            pending_commit_info
1088                .iter()
1089                .map(|(commit_ref, _)| commit_ref.to_string())
1090                .join(","),
1091            pending_finalized_commits.len(),
1092            pending_finalized_commits
1093                .iter()
1094                .map(|(commit_ref, _)| commit_ref.to_string())
1095                .join(","),
1096        );
1097        self.store
1098            .write(WriteBatch::new(
1099                pending_blocks,
1100                pending_commits,
1101                pending_commit_info,
1102                pending_finalized_commits,
1103            ))
1104            .unwrap_or_else(|e| panic!("Failed to write to storage: {:?}", e));
1105        self.context
1106            .metrics
1107            .node_metrics
1108            .dag_state_store_write_count
1109            .inc();
1110
1111        // Clean up old cached data. After flushing, all cached blocks are guaranteed to be persisted.
1112        for (authority_index, _) in self.context.committee.authorities() {
1113            let eviction_round = self.calculate_authority_eviction_round(authority_index);
1114            while let Some(block_ref) = self.recent_refs_by_authority[authority_index].first() {
1115                if block_ref.round <= eviction_round {
1116                    self.recent_blocks.remove(block_ref);
1117                    self.recent_refs_by_authority[authority_index].pop_first();
1118                } else {
1119                    break;
1120                }
1121            }
1122            self.evicted_rounds[authority_index] = eviction_round;
1123        }
1124
1125        let metrics = &self.context.metrics.node_metrics;
1126        metrics
1127            .dag_state_recent_blocks
1128            .set(self.recent_blocks.len() as i64);
1129        metrics.dag_state_recent_refs.set(
1130            self.recent_refs_by_authority
1131                .iter()
1132                .map(BTreeSet::len)
1133                .sum::<usize>() as i64,
1134        );
1135    }
1136
1137    pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> {
1138        self.store
1139            .read_last_commit_info()
1140            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
1141    }
1142
1143    pub(crate) fn add_scoring_subdags(&mut self, scoring_subdags: Vec<CommittedSubDag>) {
1144        self.scoring_subdag.add_subdags(scoring_subdags);
1145    }
1146
1147    pub(crate) fn clear_scoring_subdag(&mut self) {
1148        self.scoring_subdag.clear();
1149    }
1150
1151    pub(crate) fn scoring_subdags_count(&self) -> usize {
1152        self.scoring_subdag.scored_subdags_count()
1153    }
1154
1155    pub(crate) fn is_scoring_subdag_empty(&self) -> bool {
1156        self.scoring_subdag.is_empty()
1157    }
1158
1159    pub(crate) fn calculate_scoring_subdag_scores(&self) -> ReputationScores {
1160        self.scoring_subdag.calculate_distributed_vote_scores()
1161    }
1162
1163    pub(crate) fn scoring_subdag_commit_range(&self) -> CommitIndex {
1164        self.scoring_subdag
1165            .commit_range
1166            .as_ref()
1167            .expect("commit range should exist for scoring subdag")
1168            .end()
1169    }
1170
1171    /// The last round that should get evicted after a cache clean up operation. After this round we are
1172    /// guaranteed to have all the produced blocks from that authority. For any round that is
1173    /// <= `last_evicted_round` we don't have such guarantees as out of order blocks might exist.
1174    fn calculate_authority_eviction_round(&self, authority_index: AuthorityIndex) -> Round {
1175        let last_round = self.recent_refs_by_authority[authority_index]
1176            .last()
1177            .map(|block_ref| block_ref.round)
1178            .unwrap_or(GENESIS_ROUND);
1179
1180        Self::eviction_round(last_round, self.gc_round(), self.cached_rounds)
1181    }
1182
1183    /// Calculates the eviction round for the given authority. The goal is to keep at least `cached_rounds`
1184    /// of the latest blocks in the cache (if enough data is available), while evicting blocks with rounds <= `gc_round` when possible.
1185    fn eviction_round(last_round: Round, gc_round: Round, cached_rounds: u32) -> Round {
1186        gc_round.min(last_round.saturating_sub(cached_rounds))
1187    }
1188
1189    /// Returns the underlying store.
1190    pub(crate) fn store(&self) -> Arc<dyn Store> {
1191        self.store.clone()
1192    }
1193
1194    /// Detects and returns the blocks of the round that forms the last quorum. The method will return
1195    /// the quorum even if that's genesis.
1196    #[cfg(test)]
1197    pub(crate) fn last_quorum(&self) -> Vec<VerifiedBlock> {
1198        // the quorum should exist either on the highest accepted round or the one before. If we fail to detect
1199        // a quorum then it means that our DAG has advanced with missing causal history.
1200        for round in
1201            (self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev()
1202        {
1203            if round == GENESIS_ROUND {
1204                return self.genesis_blocks();
1205            }
1206            use crate::stake_aggregator::{QuorumThreshold, StakeAggregator};
1207            let mut quorum = StakeAggregator::<QuorumThreshold>::new();
1208
1209            // Since the minimum wave length is 3 we expect to find a quorum in the uncommitted rounds.
1210            let blocks = self.get_uncommitted_blocks_at_round(round);
1211            for block in &blocks {
1212                if quorum.add(block.author(), &self.context.committee) {
1213                    return blocks;
1214                }
1215            }
1216        }
1217
1218        panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds.");
1219    }
1220
1221    #[cfg(test)]
1222    pub(crate) fn genesis_blocks(&self) -> Vec<VerifiedBlock> {
1223        self.genesis.values().cloned().collect()
1224    }
1225
1226    #[cfg(test)]
1227    pub(crate) fn set_last_commit(&mut self, commit: TrustedCommit) {
1228        self.last_commit = Some(commit);
1229    }
1230}
1231
1232struct BlockInfo {
1233    block: VerifiedBlock,
1234    // Whether the block has been committed
1235    committed: bool,
1236    // Whether the block has been included in the causal history of an owned proposed block.
1237    ///
1238    /// There are two usages of this field:
1239    /// 1. When proposing blocks, determine the set of blocks to carry votes for.
1240    /// 2. When recovering, determine if a block has not been included in a proposed block and
1241    ///    should recover transaction votes by voting.
1242    included: bool,
1243}
1244
1245impl BlockInfo {
1246    fn new(block: VerifiedBlock) -> Self {
1247        Self {
1248            block,
1249            committed: false,
1250            included: false,
1251        }
1252    }
1253}
1254
1255#[cfg(test)]
1256mod test {
1257    use std::vec;
1258
1259    use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs};
1260    use parking_lot::RwLock;
1261
1262    use super::*;
1263    use crate::{
1264        block::{TestBlock, VerifiedBlock},
1265        storage::{WriteBatch, mem_store::MemStore},
1266        test_dag_builder::DagBuilder,
1267        test_dag_parser::parse_dag,
1268    };
1269
1270    #[tokio::test]
1271    async fn test_get_blocks() {
1272        let (context, _) = Context::new_for_test(4);
1273        let context = Arc::new(context);
1274        let store = Arc::new(MemStore::new());
1275        let mut dag_state = DagState::new(context.clone(), store.clone());
1276        let own_index = AuthorityIndex::new_for_test(0);
1277
1278        // Populate test blocks for round 1 ~ 10, authorities 0 ~ 2.
1279        let num_rounds: u32 = 10;
1280        let non_existent_round: u32 = 100;
1281        let num_authorities: u32 = 3;
1282        let num_blocks_per_slot: usize = 3;
1283        let mut blocks = BTreeMap::new();
1284        for round in 1..=num_rounds {
1285            for author in 0..num_authorities {
1286                // Create 3 blocks per slot, with different timestamps and digests.
1287                let base_ts = round as BlockTimestampMs * 1000;
1288                for timestamp in base_ts..base_ts + num_blocks_per_slot as u64 {
1289                    let block = VerifiedBlock::new_for_test(
1290                        TestBlock::new(round, author)
1291                            .set_timestamp_ms(timestamp)
1292                            .build(),
1293                    );
1294                    dag_state.accept_block(block.clone());
1295                    blocks.insert(block.reference(), block);
1296
1297                    // Only write one block per slot for own index
1298                    if AuthorityIndex::new_for_test(author) == own_index {
1299                        break;
1300                    }
1301                }
1302            }
1303        }
1304
1305        // Check uncommitted blocks that exist.
1306        for (r, block) in &blocks {
1307            assert_eq!(&dag_state.get_block(r).unwrap(), block);
1308        }
1309
1310        // Check uncommitted blocks that do not exist.
1311        let last_ref = blocks.keys().last().unwrap();
1312        assert!(
1313            dag_state
1314                .get_block(&BlockRef::new(
1315                    last_ref.round,
1316                    last_ref.author,
1317                    BlockDigest::MIN
1318                ))
1319                .is_none()
1320        );
1321
1322        // Check slots with uncommitted blocks.
1323        for round in 1..=num_rounds {
1324            for author in 0..num_authorities {
1325                let slot = Slot::new(
1326                    round,
1327                    context
1328                        .committee
1329                        .to_authority_index(author as usize)
1330                        .unwrap(),
1331                );
1332                let blocks = dag_state.get_uncommitted_blocks_at_slot(slot);
1333
1334                // We only write one block per slot for own index
1335                if AuthorityIndex::new_for_test(author) == own_index {
1336                    assert_eq!(blocks.len(), 1);
1337                } else {
1338                    assert_eq!(blocks.len(), num_blocks_per_slot);
1339                }
1340
1341                for b in blocks {
1342                    assert_eq!(b.round(), round);
1343                    assert_eq!(
1344                        b.author(),
1345                        context
1346                            .committee
1347                            .to_authority_index(author as usize)
1348                            .unwrap()
1349                    );
1350                }
1351            }
1352        }
1353
1354        // Check slots without uncommitted blocks.
1355        let slot = Slot::new(non_existent_round, AuthorityIndex::ZERO);
1356        assert!(dag_state.get_uncommitted_blocks_at_slot(slot).is_empty());
1357
1358        // Check rounds with uncommitted blocks.
1359        for round in 1..=num_rounds {
1360            let blocks = dag_state.get_uncommitted_blocks_at_round(round);
1361            // Expect 3 blocks per authority except for own authority which should
1362            // have 1 block.
1363            assert_eq!(
1364                blocks.len(),
1365                (num_authorities - 1) as usize * num_blocks_per_slot + 1
1366            );
1367            for b in blocks {
1368                assert_eq!(b.round(), round);
1369            }
1370        }
1371
1372        // Check rounds without uncommitted blocks.
1373        assert!(
1374            dag_state
1375                .get_uncommitted_blocks_at_round(non_existent_round)
1376                .is_empty()
1377        );
1378    }
1379
1380    #[tokio::test]
1381    async fn test_ancestors_at_uncommitted_round() {
1382        // Initialize DagState.
1383        let (context, _) = Context::new_for_test(4);
1384        let context = Arc::new(context);
1385        let store = Arc::new(MemStore::new());
1386        let mut dag_state = DagState::new(context.clone(), store.clone());
1387
1388        // Populate DagState.
1389
1390        // Round 10 refs will not have their blocks in DagState.
1391        let round_10_refs: Vec<_> = (0..4)
1392            .map(|a| {
1393                VerifiedBlock::new_for_test(TestBlock::new(10, a).set_timestamp_ms(1000).build())
1394                    .reference()
1395            })
1396            .collect();
1397
1398        // Round 11 blocks.
1399        let round_11 = vec![
1400            // This will connect to round 12.
1401            VerifiedBlock::new_for_test(
1402                TestBlock::new(11, 0)
1403                    .set_timestamp_ms(1100)
1404                    .set_ancestors(round_10_refs.clone())
1405                    .build(),
1406            ),
1407            // Slot(11, 1) has 3 blocks.
1408            // This will connect to round 12.
1409            VerifiedBlock::new_for_test(
1410                TestBlock::new(11, 1)
1411                    .set_timestamp_ms(1110)
1412                    .set_ancestors(round_10_refs.clone())
1413                    .build(),
1414            ),
1415            // This will connect to round 13.
1416            VerifiedBlock::new_for_test(
1417                TestBlock::new(11, 1)
1418                    .set_timestamp_ms(1111)
1419                    .set_ancestors(round_10_refs.clone())
1420                    .build(),
1421            ),
1422            // This will not connect to any block.
1423            VerifiedBlock::new_for_test(
1424                TestBlock::new(11, 1)
1425                    .set_timestamp_ms(1112)
1426                    .set_ancestors(round_10_refs.clone())
1427                    .build(),
1428            ),
1429            // This will not connect to any block.
1430            VerifiedBlock::new_for_test(
1431                TestBlock::new(11, 2)
1432                    .set_timestamp_ms(1120)
1433                    .set_ancestors(round_10_refs.clone())
1434                    .build(),
1435            ),
1436            // This will connect to round 12.
1437            VerifiedBlock::new_for_test(
1438                TestBlock::new(11, 3)
1439                    .set_timestamp_ms(1130)
1440                    .set_ancestors(round_10_refs.clone())
1441                    .build(),
1442            ),
1443        ];
1444
1445        // Round 12 blocks.
1446        let ancestors_for_round_12 = vec![
1447            round_11[0].reference(),
1448            round_11[1].reference(),
1449            round_11[5].reference(),
1450        ];
1451        let round_12 = vec![
1452            VerifiedBlock::new_for_test(
1453                TestBlock::new(12, 0)
1454                    .set_timestamp_ms(1200)
1455                    .set_ancestors(ancestors_for_round_12.clone())
1456                    .build(),
1457            ),
1458            VerifiedBlock::new_for_test(
1459                TestBlock::new(12, 2)
1460                    .set_timestamp_ms(1220)
1461                    .set_ancestors(ancestors_for_round_12.clone())
1462                    .build(),
1463            ),
1464            VerifiedBlock::new_for_test(
1465                TestBlock::new(12, 3)
1466                    .set_timestamp_ms(1230)
1467                    .set_ancestors(ancestors_for_round_12.clone())
1468                    .build(),
1469            ),
1470        ];
1471
1472        // Round 13 blocks.
1473        let ancestors_for_round_13 = vec![
1474            round_12[0].reference(),
1475            round_12[1].reference(),
1476            round_12[2].reference(),
1477            round_11[2].reference(),
1478        ];
1479        let round_13 = vec![
1480            VerifiedBlock::new_for_test(
1481                TestBlock::new(12, 1)
1482                    .set_timestamp_ms(1300)
1483                    .set_ancestors(ancestors_for_round_13.clone())
1484                    .build(),
1485            ),
1486            VerifiedBlock::new_for_test(
1487                TestBlock::new(12, 2)
1488                    .set_timestamp_ms(1320)
1489                    .set_ancestors(ancestors_for_round_13.clone())
1490                    .build(),
1491            ),
1492            VerifiedBlock::new_for_test(
1493                TestBlock::new(12, 3)
1494                    .set_timestamp_ms(1330)
1495                    .set_ancestors(ancestors_for_round_13.clone())
1496                    .build(),
1497            ),
1498        ];
1499
1500        // Round 14 anchor block.
1501        let ancestors_for_round_14 = round_13.iter().map(|b| b.reference()).collect();
1502        let anchor = VerifiedBlock::new_for_test(
1503            TestBlock::new(14, 1)
1504                .set_timestamp_ms(1410)
1505                .set_ancestors(ancestors_for_round_14)
1506                .build(),
1507        );
1508
1509        // Add all blocks (at and above round 11) to DagState.
1510        for b in round_11
1511            .iter()
1512            .chain(round_12.iter())
1513            .chain(round_13.iter())
1514            .chain([anchor.clone()].iter())
1515        {
1516            dag_state.accept_block(b.clone());
1517        }
1518
1519        // Check ancestors connected to anchor.
1520        let ancestors = dag_state.ancestors_at_round(&anchor, 11);
1521        let mut ancestors_refs: Vec<BlockRef> = ancestors.iter().map(|b| b.reference()).collect();
1522        ancestors_refs.sort();
1523        let mut expected_refs = vec![
1524            round_11[0].reference(),
1525            round_11[1].reference(),
1526            round_11[2].reference(),
1527            round_11[5].reference(),
1528        ];
1529        expected_refs.sort(); // we need to sort as blocks with same author and round of round 11 (position 1 & 2) might not be in right lexicographical order.
1530        assert_eq!(
1531            ancestors_refs, expected_refs,
1532            "Expected round 11 ancestors: {:?}. Got: {:?}",
1533            expected_refs, ancestors_refs
1534        );
1535    }
1536
1537    #[tokio::test]
1538    async fn test_link_causal_history() {
1539        let (mut context, _) = Context::new_for_test(4);
1540        context.parameters.dag_state_cached_rounds = 10;
1541        context
1542            .protocol_config
1543            .set_consensus_gc_depth_for_testing(3);
1544        let context = Arc::new(context);
1545
1546        let store = Arc::new(MemStore::new());
1547        let mut dag_state = DagState::new(context.clone(), store.clone());
1548
1549        // Create for rounds 1..=6. Skip creating blocks for authority 0 for rounds 4 - 6.
1550        let mut dag_builder = DagBuilder::new(context.clone());
1551        dag_builder.layers(1..=3).build();
1552        dag_builder
1553            .layers(4..=6)
1554            .authorities(vec![AuthorityIndex::new_for_test(0)])
1555            .skip_block()
1556            .build();
1557
1558        // Accept all blocks
1559        let all_blocks = dag_builder.all_blocks();
1560        dag_state.accept_blocks(all_blocks.clone());
1561
1562        // No block is linked yet.
1563        for block in &all_blocks {
1564            assert!(!dag_state.has_been_included(&block.reference()));
1565        }
1566
1567        // Link causal history from a round 1 block.
1568        let round_1_block = &all_blocks[1];
1569        assert_eq!(round_1_block.round(), 1);
1570        let linked_blocks = dag_state.link_causal_history(round_1_block.reference());
1571
1572        // Check that the block is linked.
1573        assert_eq!(linked_blocks.len(), 1);
1574        assert_eq!(linked_blocks[0], round_1_block.reference());
1575        for block_ref in linked_blocks {
1576            assert!(dag_state.has_been_included(&block_ref));
1577        }
1578
1579        // Link causal history from a round 2 block.
1580        let round_2_block = &all_blocks[4];
1581        assert_eq!(round_2_block.round(), 2);
1582        let linked_blocks = dag_state.link_causal_history(round_2_block.reference());
1583
1584        // Check the linked blocks.
1585        assert_eq!(linked_blocks.len(), 4);
1586        for block_ref in linked_blocks {
1587            assert!(block_ref == round_2_block.reference() || block_ref.round == 1);
1588        }
1589
1590        // Check linked status in dag state.
1591        for block in &all_blocks {
1592            if block.round() == 1 || block.reference() == round_2_block.reference() {
1593                assert!(dag_state.has_been_included(&block.reference()));
1594            } else {
1595                assert!(!dag_state.has_been_included(&block.reference()));
1596            }
1597        }
1598
1599        // Select round 6 block.
1600        let round_6_block = all_blocks.last().unwrap();
1601        assert_eq!(round_6_block.round(), 6);
1602
1603        // Get GC round to 3.
1604        let last_commit = TrustedCommit::new_for_test(
1605            6,
1606            CommitDigest::MIN,
1607            context.clock.timestamp_utc_ms(),
1608            round_6_block.reference(),
1609            vec![],
1610        );
1611        dag_state.set_last_commit(last_commit);
1612        assert_eq!(
1613            dag_state.gc_round(),
1614            3,
1615            "GC round should have moved to round 3"
1616        );
1617
1618        // Link causal history from a round 6 block.
1619        let linked_blocks = dag_state.link_causal_history(round_6_block.reference());
1620
1621        // Check the linked blocks. They should not include GC'ed blocks.
1622        assert_eq!(linked_blocks.len(), 7, "Linked blocks: {:?}", linked_blocks);
1623        for block_ref in linked_blocks {
1624            assert!(
1625                block_ref.round == 4
1626                    || block_ref.round == 5
1627                    || block_ref == round_6_block.reference()
1628            );
1629        }
1630
1631        // Check linked status in dag state.
1632        for block in &all_blocks {
1633            let block_ref = block.reference();
1634            if block.round() == 1
1635                || block_ref == round_2_block.reference()
1636                || block_ref.round == 4
1637                || block_ref.round == 5
1638                || block_ref == round_6_block.reference()
1639            {
1640                assert!(dag_state.has_been_included(&block.reference()));
1641            } else {
1642                assert!(!dag_state.has_been_included(&block.reference()));
1643            }
1644        }
1645    }
1646
1647    #[tokio::test]
1648    async fn test_contains_blocks_in_cache_or_store() {
1649        /// Only keep elements up to 2 rounds before the last committed round
1650        const CACHED_ROUNDS: Round = 2;
1651
1652        let (mut context, _) = Context::new_for_test(4);
1653        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1654
1655        let context = Arc::new(context);
1656        let store = Arc::new(MemStore::new());
1657        let mut dag_state = DagState::new(context.clone(), store.clone());
1658
1659        // Create test blocks for round 1 ~ 10
1660        let num_rounds: u32 = 10;
1661        let num_authorities: u32 = 4;
1662        let mut blocks = Vec::new();
1663
1664        for round in 1..=num_rounds {
1665            for author in 0..num_authorities {
1666                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1667                blocks.push(block);
1668            }
1669        }
1670
1671        // Now write in store the blocks from first 4 rounds and the rest to the dag state
1672        blocks.clone().into_iter().for_each(|block| {
1673            if block.round() <= 4 {
1674                store
1675                    .write(WriteBatch::default().blocks(vec![block]))
1676                    .unwrap();
1677            } else {
1678                dag_state.accept_blocks(vec![block]);
1679            }
1680        });
1681
1682        // Now when trying to query whether we have all the blocks, we should successfully retrieve a positive answer
1683        // where the blocks of first 4 round should be found in DagState and the rest in store.
1684        let mut block_refs = blocks
1685            .iter()
1686            .map(|block| block.reference())
1687            .collect::<Vec<_>>();
1688        let result = dag_state.contains_blocks(block_refs.clone());
1689
1690        // Ensure everything is found
1691        let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1692        assert_eq!(result, expected);
1693
1694        // Now try to ask also for one block ref that is neither in cache nor in store
1695        block_refs.insert(
1696            3,
1697            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1698        );
1699        let result = dag_state.contains_blocks(block_refs.clone());
1700
1701        // Then all should be found apart from the last one
1702        expected.insert(3, false);
1703        assert_eq!(result, expected.clone());
1704    }
1705
1706    #[tokio::test]
1707    async fn test_contains_cached_block_at_slot() {
1708        /// Only keep elements up to 2 rounds before the last committed round
1709        const CACHED_ROUNDS: Round = 2;
1710
1711        let num_authorities: u32 = 4;
1712        let (mut context, _) = Context::new_for_test(num_authorities as usize);
1713        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1714
1715        let context = Arc::new(context);
1716        let store = Arc::new(MemStore::new());
1717        let mut dag_state = DagState::new(context.clone(), store.clone());
1718
1719        // Create test blocks for round 1 ~ 10
1720        let num_rounds: u32 = 10;
1721        let mut blocks = Vec::new();
1722
1723        for round in 1..=num_rounds {
1724            for author in 0..num_authorities {
1725                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1726                blocks.push(block.clone());
1727                dag_state.accept_block(block);
1728            }
1729        }
1730
1731        // Query for genesis round 0, genesis blocks should be returned
1732        for (author, _) in context.committee.authorities() {
1733            assert!(
1734                dag_state.contains_cached_block_at_slot(Slot::new(GENESIS_ROUND, author)),
1735                "Genesis should always be found"
1736            );
1737        }
1738
1739        // Now when trying to query whether we have all the blocks, we should successfully retrieve a positive answer
1740        // where the blocks of first 4 round should be found in DagState and the rest in store.
1741        let mut block_refs = blocks
1742            .iter()
1743            .map(|block| block.reference())
1744            .collect::<Vec<_>>();
1745
1746        for block_ref in block_refs.clone() {
1747            let slot = block_ref.into();
1748            let found = dag_state.contains_cached_block_at_slot(slot);
1749            assert!(found, "A block should be found at slot {}", slot);
1750        }
1751
1752        // Now try to ask also for one block ref that is not in cache
1753        // Then all should be found apart from the last one
1754        block_refs.insert(
1755            3,
1756            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1757        );
1758        let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1759        expected.insert(3, false);
1760
1761        // Attempt to check the same for via the contains slot method
1762        for block_ref in block_refs {
1763            let slot = block_ref.into();
1764            let found = dag_state.contains_cached_block_at_slot(slot);
1765
1766            assert_eq!(expected.remove(0), found);
1767        }
1768    }
1769
1770    #[tokio::test]
1771    #[ignore]
1772    #[should_panic(
1773        expected = "Attempted to check for slot [1]3 that is <= the last gc evicted round 3"
1774    )]
1775    async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range() {
1776        /// Keep 2 rounds from the highest committed round. This is considered universal and minimum necessary blocks to hold
1777        /// for the correct node operation.
1778        const GC_DEPTH: u32 = 2;
1779        /// Keep at least 3 rounds in cache for each authority.
1780        const CACHED_ROUNDS: Round = 3;
1781
1782        let (mut context, _) = Context::new_for_test(4);
1783        context
1784            .protocol_config
1785            .set_consensus_gc_depth_for_testing(GC_DEPTH);
1786        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1787
1788        let context = Arc::new(context);
1789        let store = Arc::new(MemStore::new());
1790        let mut dag_state = DagState::new(context.clone(), store.clone());
1791
1792        // Create for rounds 1..=6. Skip creating blocks for authority 0 for rounds 4 - 6.
1793        let mut dag_builder = DagBuilder::new(context.clone());
1794        dag_builder.layers(1..=3).build();
1795        dag_builder
1796            .layers(4..=6)
1797            .authorities(vec![AuthorityIndex::new_for_test(0)])
1798            .skip_block()
1799            .build();
1800
1801        // Accept all blocks
1802        dag_builder
1803            .all_blocks()
1804            .into_iter()
1805            .for_each(|block| dag_state.accept_block(block));
1806
1807        // Now add a commit for leader round 5 to trigger an eviction
1808        dag_state.add_commit(TrustedCommit::new_for_test(
1809            1 as CommitIndex,
1810            CommitDigest::MIN,
1811            0,
1812            dag_builder.leader_block(5).unwrap().reference(),
1813            vec![],
1814        ));
1815        // Flush the DAG state to storage.
1816        dag_state.flush();
1817
1818        // Ensure that gc round has been updated
1819        assert_eq!(dag_state.gc_round(), 3, "GC round should be 3");
1820
1821        // Now what we expect to happen is for:
1822        // * Nodes 1 - 3 should have in cache blocks from gc_round (3) and onwards.
1823        // * Node 0 should have in cache blocks from it's latest round, 3, up to round 1, which is the number of cached_rounds.
1824        for authority_index in 1..=3 {
1825            for round in 4..=6 {
1826                assert!(dag_state.contains_cached_block_at_slot(Slot::new(
1827                    round,
1828                    AuthorityIndex::new_for_test(authority_index)
1829                )));
1830            }
1831        }
1832
1833        for round in 1..=3 {
1834            assert!(
1835                dag_state.contains_cached_block_at_slot(Slot::new(
1836                    round,
1837                    AuthorityIndex::new_for_test(0)
1838                ))
1839            );
1840        }
1841
1842        // When trying to request for authority 1 at block slot 3 it should panic, as anything
1843        // that is <= 3 should be evicted
1844        let _ =
1845            dag_state.contains_cached_block_at_slot(Slot::new(3, AuthorityIndex::new_for_test(1)));
1846    }
1847
1848    #[tokio::test]
1849    async fn test_get_blocks_in_cache_or_store() {
1850        let (context, _) = Context::new_for_test(4);
1851        let context = Arc::new(context);
1852        let store = Arc::new(MemStore::new());
1853        let mut dag_state = DagState::new(context.clone(), store.clone());
1854
1855        // Create test blocks for round 1 ~ 10
1856        let num_rounds: u32 = 10;
1857        let num_authorities: u32 = 4;
1858        let mut blocks = Vec::new();
1859
1860        for round in 1..=num_rounds {
1861            for author in 0..num_authorities {
1862                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1863                blocks.push(block);
1864            }
1865        }
1866
1867        // Now write in store the blocks from first 4 rounds and the rest to the dag state
1868        blocks.clone().into_iter().for_each(|block| {
1869            if block.round() <= 4 {
1870                store
1871                    .write(WriteBatch::default().blocks(vec![block]))
1872                    .unwrap();
1873            } else {
1874                dag_state.accept_blocks(vec![block]);
1875            }
1876        });
1877
1878        // Now when trying to query whether we have all the blocks, we should successfully retrieve a positive answer
1879        // where the blocks of first 4 round should be found in DagState and the rest in store.
1880        let mut block_refs = blocks
1881            .iter()
1882            .map(|block| block.reference())
1883            .collect::<Vec<_>>();
1884        let result = dag_state.get_blocks(&block_refs);
1885
1886        let mut expected = blocks
1887            .into_iter()
1888            .map(Some)
1889            .collect::<Vec<Option<VerifiedBlock>>>();
1890
1891        // Ensure everything is found
1892        assert_eq!(result, expected.clone());
1893
1894        // Now try to ask also for one block ref that is neither in cache nor in store
1895        block_refs.insert(
1896            3,
1897            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1898        );
1899        let result = dag_state.get_blocks(&block_refs);
1900
1901        // Then all should be found apart from the last one
1902        expected.insert(3, None);
1903        assert_eq!(result, expected);
1904    }
1905
1906    #[tokio::test]
1907    async fn test_flush_and_recovery() {
1908        telemetry_subscribers::init_for_testing();
1909
1910        const GC_DEPTH: u32 = 3;
1911        const CACHED_ROUNDS: u32 = 4;
1912
1913        let num_authorities: u32 = 4;
1914        let (mut context, _) = Context::new_for_test(num_authorities as usize);
1915        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1916        context
1917            .protocol_config
1918            .set_consensus_gc_depth_for_testing(GC_DEPTH);
1919
1920        let context = Arc::new(context);
1921
1922        let store = Arc::new(MemStore::new());
1923        let mut dag_state = DagState::new(context.clone(), store.clone());
1924
1925        const NUM_ROUNDS: Round = 20;
1926        let mut dag_builder = DagBuilder::new(context.clone());
1927        dag_builder.layers(1..=5).build();
1928        dag_builder
1929            .layers(6..=8)
1930            .authorities(vec![AuthorityIndex::new_for_test(0)])
1931            .skip_block()
1932            .build();
1933        dag_builder.layers(9..=NUM_ROUNDS).build();
1934
1935        // Get all commits from the DAG builder.
1936        const LAST_COMMIT_ROUND: Round = 16;
1937        const LAST_COMMIT_INDEX: CommitIndex = 15;
1938        let commits = dag_builder
1939            .get_sub_dag_and_commits(1..=NUM_ROUNDS)
1940            .into_iter()
1941            .map(|(_subdag, commit)| commit)
1942            .take(LAST_COMMIT_INDEX as usize)
1943            .collect::<Vec<_>>();
1944        assert_eq!(commits.len(), LAST_COMMIT_INDEX as usize);
1945        assert_eq!(commits.last().unwrap().round(), LAST_COMMIT_ROUND);
1946
1947        // Add the blocks from first 11 rounds and first 8 commits to the dag state
1948        // Note that the commit of round 8 is missing because where authority 0 is the leader but produced no block.
1949        // So commit 8 has leader round 9.
1950        const PERSISTED_BLOCK_ROUNDS: u32 = 12;
1951        const NUM_PERSISTED_COMMITS: usize = 8;
1952        const LAST_PERSISTED_COMMIT_ROUND: Round = 9;
1953        const LAST_PERSISTED_COMMIT_INDEX: CommitIndex = 8;
1954        dag_state.accept_blocks(dag_builder.blocks(1..=PERSISTED_BLOCK_ROUNDS));
1955        let mut finalized_commits = vec![];
1956        for commit in commits.iter().take(NUM_PERSISTED_COMMITS).cloned() {
1957            finalized_commits.push(commit.clone());
1958            dag_state.add_commit(commit);
1959        }
1960        let last_finalized_commit = finalized_commits.last().unwrap();
1961        assert_eq!(last_finalized_commit.round(), LAST_PERSISTED_COMMIT_ROUND);
1962        assert_eq!(last_finalized_commit.index(), LAST_PERSISTED_COMMIT_INDEX);
1963
1964        // Collect finalized blocks.
1965        let finalized_blocks = finalized_commits
1966            .iter()
1967            .flat_map(|commit| commit.blocks())
1968            .collect::<BTreeSet<_>>();
1969
1970        // Flush commits from the dag state
1971        dag_state.flush();
1972
1973        // Verify the store has blocks up to round 12, and commits up to index 8.
1974        let store_blocks = store
1975            .scan_blocks_by_author(AuthorityIndex::new_for_test(1), 1)
1976            .unwrap();
1977        assert_eq!(store_blocks.last().unwrap().round(), PERSISTED_BLOCK_ROUNDS);
1978        let store_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1979        assert_eq!(store_commits.len(), NUM_PERSISTED_COMMITS);
1980        assert_eq!(
1981            store_commits.last().unwrap().index(),
1982            LAST_PERSISTED_COMMIT_INDEX
1983        );
1984        assert_eq!(
1985            store_commits.last().unwrap().round(),
1986            LAST_PERSISTED_COMMIT_ROUND
1987        );
1988
1989        // Add the rest of the blocks and commits to the dag state
1990        dag_state.accept_blocks(dag_builder.blocks(PERSISTED_BLOCK_ROUNDS + 1..=NUM_ROUNDS));
1991        for commit in commits.iter().skip(NUM_PERSISTED_COMMITS).cloned() {
1992            dag_state.add_commit(commit);
1993        }
1994
1995        // All blocks should be found in DagState.
1996        let all_blocks = dag_builder.blocks(1..=NUM_ROUNDS);
1997        let block_refs = all_blocks
1998            .iter()
1999            .map(|block| block.reference())
2000            .collect::<Vec<_>>();
2001        let result = dag_state
2002            .get_blocks(&block_refs)
2003            .into_iter()
2004            .map(|b| b.unwrap())
2005            .collect::<Vec<_>>();
2006        assert_eq!(result, all_blocks);
2007
2008        // Last commit index from DagState should now be 15
2009        assert_eq!(dag_state.last_commit_index(), LAST_COMMIT_INDEX);
2010
2011        // Destroy the dag state without flushing additional data.
2012        drop(dag_state);
2013
2014        // Recover the state from the store
2015        let dag_state = DagState::new(context.clone(), store.clone());
2016
2017        // Persisted blocks rounds should be found in DagState.
2018        let all_blocks = dag_builder.blocks(1..=PERSISTED_BLOCK_ROUNDS);
2019        let block_refs = all_blocks
2020            .iter()
2021            .map(|block| block.reference())
2022            .collect::<Vec<_>>();
2023        let result = dag_state
2024            .get_blocks(&block_refs)
2025            .into_iter()
2026            .map(|b| b.unwrap())
2027            .collect::<Vec<_>>();
2028        assert_eq!(result, all_blocks);
2029
2030        // Unpersisted blocks should not be in DagState, because they are not flushed.
2031        let missing_blocks = dag_builder.blocks(PERSISTED_BLOCK_ROUNDS + 1..=NUM_ROUNDS);
2032        let block_refs = missing_blocks
2033            .iter()
2034            .map(|block| block.reference())
2035            .collect::<Vec<_>>();
2036        let retrieved_blocks = dag_state
2037            .get_blocks(&block_refs)
2038            .into_iter()
2039            .flatten()
2040            .collect::<Vec<_>>();
2041        assert!(retrieved_blocks.is_empty());
2042
2043        // Recovered last commit index and round should be 8 and 9.
2044        assert_eq!(dag_state.last_commit_index(), LAST_PERSISTED_COMMIT_INDEX);
2045        assert_eq!(dag_state.last_commit_round(), LAST_PERSISTED_COMMIT_ROUND);
2046
2047        // The last_commit_rounds of the finalized commits should have been recovered.
2048        let expected_last_committed_rounds = vec![5, 9, 8, 8];
2049        assert_eq!(
2050            dag_state.last_committed_rounds(),
2051            expected_last_committed_rounds
2052        );
2053        // Unscored subdags will be recovered based on the flushed commits and no commit info.
2054        assert_eq!(dag_state.scoring_subdags_count(), NUM_PERSISTED_COMMITS);
2055
2056        // Ensure that cached blocks exist only for specific rounds per authority
2057        for (authority_index, _) in context.committee.authorities() {
2058            let blocks = dag_state.get_cached_blocks(authority_index, 1);
2059
2060            // Ensure that eviction rounds have been properly recovered.
2061            // For every authority, the gc round is 9 - 3 = 6, and cached round is 12-5 = 7.
2062            // So eviction round is the min which is 6.
2063            if authority_index == AuthorityIndex::new_for_test(0) {
2064                assert_eq!(blocks.len(), 4);
2065                assert_eq!(dag_state.evicted_rounds[authority_index.value()], 6);
2066                assert!(
2067                    blocks
2068                        .into_iter()
2069                        .all(|block| block.round() >= 7 && block.round() <= 12)
2070                );
2071            } else {
2072                assert_eq!(blocks.len(), 6);
2073                assert_eq!(dag_state.evicted_rounds[authority_index.value()], 6);
2074                assert!(
2075                    blocks
2076                        .into_iter()
2077                        .all(|block| block.round() >= 7 && block.round() <= 12)
2078                );
2079            }
2080        }
2081
2082        // Ensure that committed blocks from > gc_round have been correctly recovered as committed according to committed sub dags.
2083        let gc_round = dag_state.gc_round();
2084        assert_eq!(gc_round, 6);
2085        dag_state
2086            .recent_blocks
2087            .iter()
2088            .for_each(|(block_ref, block_info)| {
2089                if block_ref.round > gc_round && finalized_blocks.contains(block_ref) {
2090                    assert!(
2091                        block_info.committed,
2092                        "Block {:?} should be set as committed",
2093                        block_ref
2094                    );
2095                }
2096            });
2097
2098        // Ensure the hard linked status of blocks are recovered.
2099        // All blocks below highest accepted round, or authority 0 round 12 block, should be hard linked.
2100        // Other blocks (round 12 but not from authority 0) should not be hard linked.
2101        // This is because authority 0 blocks are considered proposed blocks.
2102        dag_state
2103            .recent_blocks
2104            .iter()
2105            .for_each(|(block_ref, block_info)| {
2106                if block_ref.round < PERSISTED_BLOCK_ROUNDS || block_ref.author.value() == 0 {
2107                    assert!(block_info.included);
2108                } else {
2109                    assert!(!block_info.included);
2110                }
2111            });
2112    }
2113
2114    #[tokio::test]
2115    async fn test_block_info_as_committed() {
2116        let num_authorities: u32 = 4;
2117        let (context, _) = Context::new_for_test(num_authorities as usize);
2118        let context = Arc::new(context);
2119
2120        let store = Arc::new(MemStore::new());
2121        let mut dag_state = DagState::new(context.clone(), store.clone());
2122
2123        // Accept a block
2124        let block = VerifiedBlock::new_for_test(
2125            TestBlock::new(1, 0)
2126                .set_timestamp_ms(1000)
2127                .set_ancestors(vec![])
2128                .build(),
2129        );
2130
2131        dag_state.accept_block(block.clone());
2132
2133        // Query is committed
2134        assert!(!dag_state.is_committed(&block.reference()));
2135
2136        // Set block as committed for first time should return true
2137        assert!(
2138            dag_state.set_committed(&block.reference()),
2139            "Block should be successfully set as committed for first time"
2140        );
2141
2142        // Now it should appear as committed
2143        assert!(dag_state.is_committed(&block.reference()));
2144
2145        // Trying to set the block as committed again, it should return false.
2146        assert!(
2147            !dag_state.set_committed(&block.reference()),
2148            "Block should not be successfully set as committed"
2149        );
2150    }
2151
2152    #[tokio::test]
2153    async fn test_get_cached_blocks() {
2154        let (mut context, _) = Context::new_for_test(4);
2155        context.parameters.dag_state_cached_rounds = 5;
2156
2157        let context = Arc::new(context);
2158        let store = Arc::new(MemStore::new());
2159        let mut dag_state = DagState::new(context.clone(), store.clone());
2160
2161        // Create no blocks for authority 0
2162        // Create one block (round 10) for authority 1
2163        // Create two blocks (rounds 10,11) for authority 2
2164        // Create three blocks (rounds 10,11,12) for authority 3
2165        let mut all_blocks = Vec::new();
2166        for author in 1..=3 {
2167            for round in 10..(10 + author) {
2168                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2169                all_blocks.push(block.clone());
2170                dag_state.accept_block(block);
2171            }
2172        }
2173
2174        // Test get_cached_blocks()
2175
2176        let cached_blocks =
2177            dag_state.get_cached_blocks(context.committee.to_authority_index(0).unwrap(), 0);
2178        assert!(cached_blocks.is_empty());
2179
2180        let cached_blocks =
2181            dag_state.get_cached_blocks(context.committee.to_authority_index(1).unwrap(), 10);
2182        assert_eq!(cached_blocks.len(), 1);
2183        assert_eq!(cached_blocks[0].round(), 10);
2184
2185        let cached_blocks =
2186            dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 10);
2187        assert_eq!(cached_blocks.len(), 2);
2188        assert_eq!(cached_blocks[0].round(), 10);
2189        assert_eq!(cached_blocks[1].round(), 11);
2190
2191        let cached_blocks =
2192            dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 11);
2193        assert_eq!(cached_blocks.len(), 1);
2194        assert_eq!(cached_blocks[0].round(), 11);
2195
2196        let cached_blocks =
2197            dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 10);
2198        assert_eq!(cached_blocks.len(), 3);
2199        assert_eq!(cached_blocks[0].round(), 10);
2200        assert_eq!(cached_blocks[1].round(), 11);
2201        assert_eq!(cached_blocks[2].round(), 12);
2202
2203        let cached_blocks =
2204            dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 12);
2205        assert_eq!(cached_blocks.len(), 1);
2206        assert_eq!(cached_blocks[0].round(), 12);
2207
2208        // Test get_cached_blocks_in_range()
2209
2210        // Start == end
2211        let cached_blocks = dag_state.get_cached_blocks_in_range(
2212            context.committee.to_authority_index(3).unwrap(),
2213            10,
2214            10,
2215            1,
2216        );
2217        assert!(cached_blocks.is_empty());
2218
2219        // Start > end
2220        let cached_blocks = dag_state.get_cached_blocks_in_range(
2221            context.committee.to_authority_index(3).unwrap(),
2222            11,
2223            10,
2224            1,
2225        );
2226        assert!(cached_blocks.is_empty());
2227
2228        // Empty result.
2229        let cached_blocks = dag_state.get_cached_blocks_in_range(
2230            context.committee.to_authority_index(0).unwrap(),
2231            9,
2232            10,
2233            1,
2234        );
2235        assert!(cached_blocks.is_empty());
2236
2237        // Single block, one round before the end.
2238        let cached_blocks = dag_state.get_cached_blocks_in_range(
2239            context.committee.to_authority_index(1).unwrap(),
2240            9,
2241            11,
2242            1,
2243        );
2244        assert_eq!(cached_blocks.len(), 1);
2245        assert_eq!(cached_blocks[0].round(), 10);
2246
2247        // Respect end round.
2248        let cached_blocks = dag_state.get_cached_blocks_in_range(
2249            context.committee.to_authority_index(2).unwrap(),
2250            9,
2251            12,
2252            5,
2253        );
2254        assert_eq!(cached_blocks.len(), 2);
2255        assert_eq!(cached_blocks[0].round(), 10);
2256        assert_eq!(cached_blocks[1].round(), 11);
2257
2258        // Respect start round.
2259        let cached_blocks = dag_state.get_cached_blocks_in_range(
2260            context.committee.to_authority_index(3).unwrap(),
2261            11,
2262            20,
2263            5,
2264        );
2265        assert_eq!(cached_blocks.len(), 2);
2266        assert_eq!(cached_blocks[0].round(), 11);
2267        assert_eq!(cached_blocks[1].round(), 12);
2268
2269        // Respect limit
2270        let cached_blocks = dag_state.get_cached_blocks_in_range(
2271            context.committee.to_authority_index(3).unwrap(),
2272            10,
2273            20,
2274            1,
2275        );
2276        assert_eq!(cached_blocks.len(), 1);
2277        assert_eq!(cached_blocks[0].round(), 10);
2278    }
2279
2280    #[tokio::test]
2281    async fn test_get_last_cached_block() {
2282        // GIVEN
2283        const CACHED_ROUNDS: Round = 2;
2284        const GC_DEPTH: u32 = 1;
2285        let (mut context, _) = Context::new_for_test(4);
2286        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2287        context
2288            .protocol_config
2289            .set_consensus_gc_depth_for_testing(GC_DEPTH);
2290
2291        let context = Arc::new(context);
2292        let store = Arc::new(MemStore::new());
2293        let mut dag_state = DagState::new(context.clone(), store.clone());
2294
2295        // Create no blocks for authority 0
2296        // Create one block (round 1) for authority 1
2297        // Create two blocks (rounds 1,2) for authority 2
2298        // Create three blocks (rounds 1,2,3) for authority 3
2299        let dag_str = "DAG {
2300            Round 0 : { 4 },
2301            Round 1 : {
2302                B -> [*],
2303                C -> [*],
2304                D -> [*],
2305            },
2306            Round 2 : {
2307                C -> [*],
2308                D -> [*],
2309            },
2310            Round 3 : {
2311                D -> [*],
2312            },
2313        }";
2314
2315        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2316
2317        // Add equivocating block for round 2 authority 3
2318        let block = VerifiedBlock::new_for_test(TestBlock::new(2, 2).build());
2319
2320        // Accept all blocks
2321        for block in dag_builder
2322            .all_blocks()
2323            .into_iter()
2324            .chain(std::iter::once(block))
2325        {
2326            dag_state.accept_block(block);
2327        }
2328
2329        dag_state.add_commit(TrustedCommit::new_for_test(
2330            1 as CommitIndex,
2331            CommitDigest::MIN,
2332            context.clock.timestamp_utc_ms(),
2333            dag_builder.leader_block(3).unwrap().reference(),
2334            vec![],
2335        ));
2336
2337        // WHEN search for the latest blocks
2338        let end_round = 4;
2339        let expected_rounds = vec![0, 1, 2, 3];
2340        let expected_excluded_and_equivocating_blocks = vec![0, 0, 1, 0];
2341        // THEN
2342        let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2343        assert_eq!(
2344            last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2345            expected_rounds
2346        );
2347        assert_eq!(
2348            last_blocks.iter().map(|b| b.1.len()).collect::<Vec<_>>(),
2349            expected_excluded_and_equivocating_blocks
2350        );
2351
2352        // THEN
2353        for (i, expected_round) in expected_rounds.iter().enumerate() {
2354            let round = dag_state
2355                .get_last_cached_block_in_range(
2356                    context.committee.to_authority_index(i).unwrap(),
2357                    0,
2358                    end_round,
2359                )
2360                .map(|b| b.round())
2361                .unwrap_or_default();
2362            assert_eq!(round, *expected_round, "Authority {i}");
2363        }
2364
2365        // WHEN starting from round 2
2366        let start_round = 2;
2367        let expected_rounds = [0, 0, 2, 3];
2368
2369        // THEN
2370        for (i, expected_round) in expected_rounds.iter().enumerate() {
2371            let round = dag_state
2372                .get_last_cached_block_in_range(
2373                    context.committee.to_authority_index(i).unwrap(),
2374                    start_round,
2375                    end_round,
2376                )
2377                .map(|b| b.round())
2378                .unwrap_or_default();
2379            assert_eq!(round, *expected_round, "Authority {i}");
2380        }
2381
2382        // WHEN we flush the DagState - after adding a commit with all the blocks, we expect this to trigger
2383        // a clean up in the internal cache. That will keep the all the blocks with rounds >= authority_commit_round - CACHED_ROUND.
2384        //
2385        // When GC is enabled then we'll keep all the blocks that are > gc_round (2) and for those who don't have blocks > gc_round, we'll keep
2386        // all their highest round blocks for CACHED_ROUNDS.
2387        dag_state.flush();
2388
2389        // AND we request before round 3
2390        let end_round = 3;
2391        let expected_rounds = vec![0, 1, 2, 2];
2392
2393        // THEN
2394        let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2395        assert_eq!(
2396            last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2397            expected_rounds
2398        );
2399
2400        // THEN
2401        for (i, expected_round) in expected_rounds.iter().enumerate() {
2402            let round = dag_state
2403                .get_last_cached_block_in_range(
2404                    context.committee.to_authority_index(i).unwrap(),
2405                    0,
2406                    end_round,
2407                )
2408                .map(|b| b.round())
2409                .unwrap_or_default();
2410            assert_eq!(round, *expected_round, "Authority {i}");
2411        }
2412    }
2413
2414    #[tokio::test]
2415    #[should_panic(
2416        expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
2417    )]
2418    async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() {
2419        // GIVEN
2420        const CACHED_ROUNDS: Round = 1;
2421        const GC_DEPTH: u32 = 1;
2422        let (mut context, _) = Context::new_for_test(4);
2423        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2424        context
2425            .protocol_config
2426            .set_consensus_gc_depth_for_testing(GC_DEPTH);
2427
2428        let context = Arc::new(context);
2429        let store = Arc::new(MemStore::new());
2430        let mut dag_state = DagState::new(context.clone(), store.clone());
2431
2432        // Create no blocks for authority 0
2433        // Create one block (round 1) for authority 1
2434        // Create two blocks (rounds 1,2) for authority 2
2435        // Create three blocks (rounds 1,2,3) for authority 3
2436        let mut dag_builder = DagBuilder::new(context.clone());
2437        dag_builder
2438            .layers(1..=1)
2439            .authorities(vec![AuthorityIndex::new_for_test(0)])
2440            .skip_block()
2441            .build();
2442        dag_builder
2443            .layers(2..=2)
2444            .authorities(vec![
2445                AuthorityIndex::new_for_test(0),
2446                AuthorityIndex::new_for_test(1),
2447            ])
2448            .skip_block()
2449            .build();
2450        dag_builder
2451            .layers(3..=3)
2452            .authorities(vec![
2453                AuthorityIndex::new_for_test(0),
2454                AuthorityIndex::new_for_test(1),
2455                AuthorityIndex::new_for_test(2),
2456            ])
2457            .skip_block()
2458            .build();
2459
2460        // Accept all blocks
2461        for block in dag_builder.all_blocks() {
2462            dag_state.accept_block(block);
2463        }
2464
2465        dag_state.add_commit(TrustedCommit::new_for_test(
2466            1 as CommitIndex,
2467            CommitDigest::MIN,
2468            0,
2469            dag_builder.leader_block(3).unwrap().reference(),
2470            vec![],
2471        ));
2472
2473        // Flush the store so we update the evict rounds
2474        dag_state.flush();
2475
2476        // THEN the method should panic, as some authorities have already evicted rounds <= round 2
2477        dag_state.get_last_cached_block_per_authority(2);
2478    }
2479
2480    #[tokio::test]
2481    async fn test_last_quorum() {
2482        // GIVEN
2483        let (context, _) = Context::new_for_test(4);
2484        let context = Arc::new(context);
2485        let store = Arc::new(MemStore::new());
2486        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2487
2488        // WHEN no blocks exist then genesis should be returned
2489        {
2490            let genesis = genesis_blocks(context.as_ref());
2491
2492            assert_eq!(dag_state.read().last_quorum(), genesis);
2493        }
2494
2495        // WHEN a fully connected DAG up to round 4 is created, then round 4 blocks should be returned as quorum
2496        {
2497            let mut dag_builder = DagBuilder::new(context.clone());
2498            dag_builder
2499                .layers(1..=4)
2500                .build()
2501                .persist_layers(dag_state.clone());
2502            let round_4_blocks: Vec<_> = dag_builder
2503                .blocks(4..=4)
2504                .into_iter()
2505                .map(|block| block.reference())
2506                .collect();
2507
2508            let last_quorum = dag_state.read().last_quorum();
2509
2510            assert_eq!(
2511                last_quorum
2512                    .into_iter()
2513                    .map(|block| block.reference())
2514                    .collect::<Vec<_>>(),
2515                round_4_blocks
2516            );
2517        }
2518
2519        // WHEN adding one more block at round 5, still round 4 should be returned as quorum
2520        {
2521            let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2522            dag_state.write().accept_block(block);
2523
2524            let round_4_blocks = dag_state.read().get_uncommitted_blocks_at_round(4);
2525
2526            let last_quorum = dag_state.read().last_quorum();
2527
2528            assert_eq!(last_quorum, round_4_blocks);
2529        }
2530    }
2531
2532    #[tokio::test]
2533    async fn test_last_block_for_authority() {
2534        // GIVEN
2535        let (context, _) = Context::new_for_test(4);
2536        let context = Arc::new(context);
2537        let store = Arc::new(MemStore::new());
2538        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2539
2540        // WHEN no blocks exist then genesis should be returned
2541        {
2542            let genesis = genesis_blocks(context.as_ref());
2543            let my_genesis = genesis
2544                .into_iter()
2545                .find(|block| block.author() == context.own_index)
2546                .unwrap();
2547
2548            assert_eq!(dag_state.read().get_last_proposed_block(), my_genesis);
2549        }
2550
2551        // WHEN adding some blocks for authorities, only the last ones should be returned
2552        {
2553            // add blocks up to round 4
2554            let mut dag_builder = DagBuilder::new(context.clone());
2555            dag_builder
2556                .layers(1..=4)
2557                .build()
2558                .persist_layers(dag_state.clone());
2559
2560            // add block 5 for authority 0
2561            let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2562            dag_state.write().accept_block(block);
2563
2564            let block = dag_state
2565                .read()
2566                .get_last_block_for_authority(AuthorityIndex::new_for_test(0));
2567            assert_eq!(block.round(), 5);
2568
2569            for (authority_index, _) in context.committee.authorities() {
2570                let block = dag_state
2571                    .read()
2572                    .get_last_block_for_authority(authority_index);
2573
2574                if authority_index.value() == 0 {
2575                    assert_eq!(block.round(), 5);
2576                } else {
2577                    assert_eq!(block.round(), 4);
2578                }
2579            }
2580        }
2581    }
2582
2583    #[tokio::test]
2584    async fn test_accept_block_not_panics_when_timestamp_is_ahead_and_median_timestamp() {
2585        // GIVEN
2586        let (context, _) = Context::new_for_test(4);
2587        let context = Arc::new(context);
2588        let store = Arc::new(MemStore::new());
2589        let mut dag_state = DagState::new(context.clone(), store.clone());
2590
2591        // Set a timestamp for the block that is ahead of the current time
2592        let block_timestamp = context.clock.timestamp_utc_ms() + 5_000;
2593
2594        let block = VerifiedBlock::new_for_test(
2595            TestBlock::new(10, 0)
2596                .set_timestamp_ms(block_timestamp)
2597                .build(),
2598        );
2599
2600        // Try to accept the block - it should not panic
2601        dag_state.accept_block(block);
2602    }
2603
2604    #[tokio::test]
2605    async fn test_last_finalized_commit() {
2606        // GIVEN
2607        let (context, _) = Context::new_for_test(4);
2608        let context = Arc::new(context);
2609        let store = Arc::new(MemStore::new());
2610        let mut dag_state = DagState::new(context.clone(), store.clone());
2611
2612        // WHEN adding a finalized commit
2613        let commit_ref = CommitRef::new(1, CommitDigest::MIN);
2614        let rejected_transactions = BTreeMap::new();
2615        dag_state.add_finalized_commit(commit_ref, rejected_transactions.clone());
2616
2617        // THEN the commit should be added to the buffer
2618        assert_eq!(dag_state.finalized_commits_to_write.len(), 1);
2619        assert_eq!(
2620            dag_state.finalized_commits_to_write[0],
2621            (commit_ref, rejected_transactions.clone())
2622        );
2623
2624        // WHEN flushing the DAG state
2625        dag_state.flush();
2626
2627        // THEN the commit and rejected transactions should be written to storage
2628        let last_finalized_commit = store.read_last_finalized_commit().unwrap();
2629        assert_eq!(last_finalized_commit, Some(commit_ref));
2630        let stored_rejected_transactions = store
2631            .read_rejected_transactions(commit_ref)
2632            .unwrap()
2633            .unwrap();
2634        assert_eq!(stored_rejected_transactions, rejected_transactions);
2635    }
2636}