consensus_core/
dag_state.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    cmp::max,
6    collections::{BTreeMap, BTreeSet, VecDeque},
7    ops::Bound::{Excluded, Included, Unbounded},
8    panic,
9    sync::Arc,
10    time::Duration,
11    vec,
12};
13
14use consensus_config::{AuthorityIndex, Stake};
15use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs, Round, TransactionIndex};
16use itertools::Itertools as _;
17use mysten_common::ZipDebugEqIteratorExt;
18use tokio::time::Instant;
19use tracing::{debug, error, info, trace};
20
21use crate::{
22    CommittedSubDag,
23    block::{BlockAPI, GENESIS_ROUND, Slot, VerifiedBlock, genesis_blocks},
24    commit::{
25        CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRef, CommitVote,
26        GENESIS_COMMIT_INDEX, TrustedCommit, load_committed_subdag_from_store,
27    },
28    context::Context,
29    leader_scoring::{ReputationScores, ScoringSubdag},
30    storage::{Store, WriteBatch},
31    threshold_clock::ThresholdClock,
32};
33
34/// DagState provides the API to write and read accepted blocks from the DAG.
35/// Only uncommitted and last committed blocks are cached in memory.
36/// The rest of blocks are stored on disk.
37/// Refs to cached blocks and additional refs are cached as well, to speed up existence checks.
38///
39/// Note: DagState should be wrapped with Arc<parking_lot::RwLock<_>>, to allow
40/// concurrent access from multiple components.
41pub struct DagState {
42    context: Arc<Context>,
43
44    // The genesis blocks
45    genesis: BTreeMap<BlockRef, VerifiedBlock>,
46
47    // Contains recent blocks within CACHED_ROUNDS from the last committed round per authority.
48    // Note: all uncommitted blocks are kept in memory.
49    //
50    // When GC is enabled, this map has a different semantic. It holds all the recent data for each authority making sure that it always have available
51    // CACHED_ROUNDS worth of data. The entries are evicted based on the latest GC round, however the eviction process will respect the CACHED_ROUNDS.
52    // For each authority, blocks are only evicted when their round is less than or equal to both `gc_round`, and `highest authority round - cached rounds`.
53    // This ensures that the GC requirements are respected (we never clean up any block above `gc_round`), and there are enough blocks cached.
54    recent_blocks: BTreeMap<BlockRef, BlockInfo>,
55
56    // Indexes recent block refs by their authorities.
57    // Vec position corresponds to the authority index.
58    recent_refs_by_authority: Vec<BTreeSet<BlockRef>>,
59
60    // Per-round aggregation of accepted blocks. Front holds the oldest retained
61    // round (`gc_round + 1` after eviction); back holds `highest_accepted_round`.
62    // Rounds between front and back are always contiguous in a valid DagState.
63    round_info: VecDeque<RoundInfo>,
64
65    // Keeps track of the threshold clock for proposing blocks.
66    threshold_clock: ThresholdClock,
67
68    // Keeps track of the highest round that has been evicted for each authority. Any blocks that are of round <= evict_round
69    // should be considered evicted, and if any exist we should not consider the causauly complete in the order they appear.
70    // The `evicted_rounds` size should be the same as the committee size.
71    evicted_rounds: Vec<Round>,
72
73    // Highest round of blocks accepted.
74    highest_accepted_round: Round,
75
76    // Last consensus commit of the dag.
77    last_commit: Option<TrustedCommit>,
78
79    // Last wall time when commit round advanced. Does not persist across restarts.
80    last_commit_round_advancement_time: Option<std::time::Instant>,
81
82    // Last committed rounds per authority.
83    last_committed_rounds: Vec<Round>,
84
85    /// The committed subdags that have been scored but scores have not been used
86    /// for leader schedule yet.
87    scoring_subdag: ScoringSubdag,
88
89    // Commit votes pending to be included in new blocks.
90    // TODO: limit to 1st commit per round with multi-leader.
91    // TODO: recover unproposed pending commit votes at startup.
92    pending_commit_votes: VecDeque<CommitVote>,
93
94    // Blocks and commits must be buffered for persistence before they can be
95    // inserted into the local DAG or sent to output.
96    blocks_to_write: Vec<VerifiedBlock>,
97    commits_to_write: Vec<TrustedCommit>,
98
99    // Buffers the reputation scores & last_committed_rounds to be flushed with the
100    // next dag state flush. Not writing eagerly is okay because we can recover reputation scores
101    // & last_committed_rounds from the commits as needed.
102    commit_info_to_write: Vec<(CommitRef, CommitInfo)>,
103
104    // Buffers finalized commits and their rejected transactions to be written to storage.
105    finalized_commits_to_write: Vec<(CommitRef, BTreeMap<BlockRef, Vec<TransactionIndex>>)>,
106
107    // Persistent storage for blocks, commits and other consensus data.
108    store: Arc<dyn Store>,
109
110    // The number of cached rounds
111    cached_rounds: Round,
112}
113
114impl DagState {
115    /// Initializes DagState from storage.
116    pub fn new(context: Arc<Context>, store: Arc<dyn Store>) -> Self {
117        let cached_rounds = context.parameters.dag_state_cached_rounds as Round;
118        let num_authorities = context.committee.size();
119
120        let genesis = genesis_blocks(context.as_ref())
121            .into_iter()
122            .map(|block| (block.reference(), block))
123            .collect();
124
125        let threshold_clock = ThresholdClock::new(1, context.clone());
126
127        let last_commit = store
128            .read_last_commit()
129            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
130
131        let commit_info = store
132            .read_last_commit_info()
133            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
134        let (mut last_committed_rounds, commit_recovery_start_index) =
135            if let Some((commit_ref, commit_info)) = commit_info {
136                tracing::info!("Recovering committed state from {commit_ref} {commit_info:?}");
137                (commit_info.committed_rounds, commit_ref.index + 1)
138            } else {
139                tracing::info!("Found no stored CommitInfo to recover from");
140                (vec![0; num_authorities], GENESIS_COMMIT_INDEX + 1)
141            };
142
143        let mut unscored_committed_subdags = Vec::new();
144        let mut scoring_subdag = ScoringSubdag::new(context.clone());
145
146        if let Some(last_commit) = last_commit.as_ref() {
147            store
148                .scan_commits((commit_recovery_start_index..=last_commit.index()).into())
149                .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
150                .iter()
151                .for_each(|commit| {
152                    for block_ref in commit.blocks() {
153                        last_committed_rounds[block_ref.author] =
154                            max(last_committed_rounds[block_ref.author], block_ref.round);
155                    }
156                    let committed_subdag =
157                        load_committed_subdag_from_store(store.as_ref(), commit.clone());
158                    unscored_committed_subdags.push(committed_subdag);
159                });
160        }
161
162        tracing::info!(
163            "DagState was initialized with the following state: \
164            {last_commit:?}; {last_committed_rounds:?}; {} unscored committed subdags;",
165            unscored_committed_subdags.len()
166        );
167
168        scoring_subdag.add_subdags(std::mem::take(&mut unscored_committed_subdags));
169
170        let mut state = Self {
171            context: context.clone(),
172            genesis,
173            recent_blocks: BTreeMap::new(),
174            recent_refs_by_authority: vec![BTreeSet::new(); num_authorities],
175            round_info: VecDeque::new(),
176            threshold_clock,
177            highest_accepted_round: 0,
178            last_commit: last_commit.clone(),
179            last_commit_round_advancement_time: None,
180            last_committed_rounds: last_committed_rounds.clone(),
181            pending_commit_votes: VecDeque::new(),
182            blocks_to_write: vec![],
183            commits_to_write: vec![],
184            commit_info_to_write: vec![],
185            finalized_commits_to_write: vec![],
186            scoring_subdag,
187            store: store.clone(),
188            cached_rounds,
189            evicted_rounds: vec![0; num_authorities],
190        };
191
192        let mut recovered_blocks = Vec::new();
193        for (authority_index, _) in context.committee.authorities() {
194            let (blocks, eviction_round) = {
195                // Find the latest block for the authority to calculate the eviction round. Then we want to scan and load the blocks from the eviction round and onwards only.
196                // As reminder, the eviction round is taking into account the gc_round.
197                let last_block = state
198                    .store
199                    .scan_last_blocks_by_author(authority_index, 1, None)
200                    .expect("Database error");
201                let last_block_round = last_block
202                    .last()
203                    .map(|b| b.round())
204                    .unwrap_or(GENESIS_ROUND);
205
206                let eviction_round =
207                    Self::eviction_round(last_block_round, state.gc_round(), state.cached_rounds);
208                let blocks = state
209                    .store
210                    .scan_blocks_by_author(authority_index, eviction_round + 1)
211                    .expect("Database error");
212
213                (blocks, eviction_round)
214            };
215
216            debug!(
217                "Recovered blocks {}: {:?}",
218                authority_index,
219                blocks
220                    .iter()
221                    .map(|b| b.reference())
222                    .collect::<Vec<BlockRef>>()
223            );
224            recovered_blocks.extend(blocks);
225
226            state.evicted_rounds[authority_index] = eviction_round;
227        }
228
229        // Update the block metadata across all recovered blocks from lowest round to highest round.
230        recovered_blocks.sort_by_key(|b| b.reference());
231        for block in &recovered_blocks {
232            state.update_block_metadata(block);
233        }
234
235        if let Some(last_commit) = last_commit {
236            let mut index = last_commit.index();
237            let gc_round = state.gc_round();
238            info!(
239                "Recovering block commit statuses from commit index {} and backwards until leader of round <= gc_round {:?}",
240                index, gc_round
241            );
242
243            loop {
244                let commits = store
245                    .scan_commits((index..=index).into())
246                    .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
247                let Some(commit) = commits.first() else {
248                    info!("Recovering finished up to index {index}, no more commits to recover");
249                    break;
250                };
251
252                // Check the commit leader round to see if it is within the gc_round. If it is not then we can stop the recovery process.
253                if gc_round > 0 && commit.leader().round <= gc_round {
254                    info!(
255                        "Recovering finished, reached commit leader round {} <= gc_round {}",
256                        commit.leader().round,
257                        gc_round
258                    );
259                    break;
260                }
261
262                commit.blocks().iter().filter(|b| b.round > gc_round).for_each(|block_ref|{
263                    debug!(
264                        "Setting block {:?} as committed based on commit {:?}",
265                        block_ref,
266                        commit.index()
267                    );
268                    assert!(state.set_committed(block_ref), "Attempted to set again a block {:?} as committed when recovering commit {:?}", block_ref, commit);
269                });
270
271                // All commits are indexed starting from 1, so one reach zero exit.
272                index = index.saturating_sub(1);
273                if index == 0 {
274                    break;
275                }
276            }
277        }
278
279        // Recover hard linked statuses for blocks within GC round.
280        let proposed_blocks = store
281            .scan_blocks_by_author(context.own_index, state.gc_round() + 1)
282            .expect("Database error");
283        for block in proposed_blocks {
284            state.link_causal_history(block.reference());
285        }
286
287        state
288    }
289
290    /// Accepts a block into DagState and keeps it in memory.
291    pub(crate) fn accept_block(&mut self, block: VerifiedBlock) {
292        assert_ne!(
293            block.round(),
294            0,
295            "Genesis block should not be accepted into DAG."
296        );
297
298        let block_ref = block.reference();
299        if self.contains_block(&block_ref) {
300            return;
301        }
302
303        let now = self.context.clock.timestamp_utc_ms();
304        if block.timestamp_ms() > now {
305            trace!(
306                "Block {:?} with timestamp {} is greater than local timestamp {}.",
307                block,
308                block.timestamp_ms(),
309                now,
310            );
311        }
312        let hostname = &self.context.committee.authority(block_ref.author).hostname;
313        self.context
314            .metrics
315            .node_metrics
316            .accepted_block_time_drift_ms
317            .with_label_values(&[hostname])
318            .inc_by(block.timestamp_ms().saturating_sub(now));
319
320        // TODO: Move this check to core
321        // Ensure we don't write multiple blocks per slot for our own index
322        if block_ref.author == self.context.own_index {
323            let existing_blocks = self.get_uncommitted_blocks_at_slot(block_ref.into());
324            if !self
325                .context
326                .parameters
327                .internal
328                .skip_equivocation_validation
329            {
330                assert!(
331                    existing_blocks.is_empty(),
332                    "Block Rejected! Attempted to add block {block:#?} to own slot where \
333                    block(s) {existing_blocks:#?} already exists."
334                );
335            }
336        }
337        self.update_block_metadata(&block);
338        self.blocks_to_write.push(block);
339        let source = if self.context.own_index == block_ref.author {
340            "own"
341        } else {
342            "others"
343        };
344        self.context
345            .metrics
346            .node_metrics
347            .accepted_blocks
348            .with_label_values(&[source])
349            .inc();
350    }
351
352    /// Updates internal metadata for a block.
353    fn update_block_metadata(&mut self, block: &VerifiedBlock) {
354        let block_ref = block.reference();
355        self.recent_blocks
356            .insert(block_ref, BlockInfo::new(block.clone()));
357        self.recent_refs_by_authority[block_ref.author].insert(block_ref);
358        if self.context.protocol_config.enable_v3() {
359            // Update votes accounting in BlockInfo.
360            for ancestor in block.ancestors() {
361                // Only update children info when the link is potentially a leader vote.
362                if ancestor.round + 1 != block_ref.round || ancestor.round <= self.gc_round() {
363                    continue;
364                }
365                let block_info = self.recent_blocks.get_mut(ancestor).unwrap_or_else(|| {
366                    panic!(
367                        "Parent block {} of block {} does not exist",
368                        ancestor, block_ref
369                    )
370                });
371                if block_info.children.insert(block_ref)
372                    && block_info.children_authorities.insert(block_ref.author)
373                {
374                    let child_stake = self.context.committee.stake(block_ref.author);
375                    block_info.total_children_stake += child_stake;
376                }
377            }
378            self.update_round_info(block);
379        }
380        if self.threshold_clock.add_block(block_ref) {
381            // Do not measure quorum delay when no local block is proposed in the round.
382            if let Some(last_proposed_block) = self.get_last_proposed_block()
383                && last_proposed_block.round() == block_ref.round
384            {
385                let quorum_delay_ms = self
386                    .context
387                    .clock
388                    .timestamp_utc_ms()
389                    .saturating_sub(last_proposed_block.timestamp_ms());
390                self.context
391                    .metrics
392                    .node_metrics
393                    .quorum_receive_latency
394                    .observe(Duration::from_millis(quorum_delay_ms).as_secs_f64());
395            }
396        }
397
398        self.highest_accepted_round = max(self.highest_accepted_round, block.round());
399        self.context
400            .metrics
401            .node_metrics
402            .highest_accepted_round
403            .set(self.highest_accepted_round as i64);
404
405        let highest_accepted_round_for_author = self.recent_refs_by_authority[block_ref.author]
406            .last()
407            .map(|block_ref| block_ref.round)
408            .expect("There should be by now at least one block ref");
409        let hostname = &self.context.committee.authority(block_ref.author).hostname;
410        self.context
411            .metrics
412            .node_metrics
413            .highest_accepted_authority_round
414            .with_label_values(&[hostname])
415            .set(highest_accepted_round_for_author as i64);
416    }
417
418    /// Accepts a blocks into DagState and keeps it in memory.
419    pub(crate) fn accept_blocks(&mut self, blocks: Vec<VerifiedBlock>) {
420        debug!(
421            "Accepting blocks: {}",
422            blocks.iter().map(|b| b.reference().to_string()).join(",")
423        );
424        for block in blocks {
425            self.accept_block(block);
426        }
427    }
428
429    /// Gets a block by checking cached recent blocks then storage.
430    /// Returns None when the block is not found.
431    pub(crate) fn get_block(&self, reference: &BlockRef) -> Option<VerifiedBlock> {
432        self.get_blocks(&[*reference])
433            .pop()
434            .expect("Exactly one element should be returned")
435    }
436
437    /// Gets blocks by checking genesis, cached recent blocks in memory, then storage.
438    /// An element is None when the corresponding block is not found.
439    pub(crate) fn get_blocks(&self, block_refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
440        if block_refs.is_empty() {
441            return vec![];
442        }
443
444        let mut blocks = vec![None; block_refs.len()];
445        let mut missing = Vec::new();
446
447        for (index, block_ref) in block_refs.iter().enumerate() {
448            if block_ref.round == GENESIS_ROUND {
449                // Allow the caller to handle the invalid genesis ancestor error.
450                if let Some(block) = self.genesis.get(block_ref) {
451                    blocks[index] = Some(block.clone());
452                }
453                continue;
454            }
455            if let Some(block_info) = self.recent_blocks.get(block_ref) {
456                blocks[index] = Some(block_info.block.clone());
457                continue;
458            }
459            missing.push((index, block_ref));
460        }
461
462        if missing.is_empty() {
463            return blocks;
464        }
465
466        let missing_refs = missing
467            .iter()
468            .map(|(_, block_ref)| **block_ref)
469            .collect::<Vec<_>>();
470        let store_results = self
471            .store
472            .read_blocks(&missing_refs)
473            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
474        self.context
475            .metrics
476            .node_metrics
477            .dag_state_store_read_count
478            .with_label_values(&["get_blocks"])
479            .inc();
480
481        for ((index, _), result) in missing.into_iter().zip_debug_eq(store_results.into_iter()) {
482            blocks[index] = result;
483        }
484
485        blocks
486    }
487
488    /// Gets all uncommitted blocks in a slot.
489    /// Uncommitted blocks must exist in memory, so only in-memory blocks are checked.
490    pub(crate) fn get_uncommitted_blocks_at_slot(&self, slot: Slot) -> Vec<VerifiedBlock> {
491        // TODO: either panic below when the slot is at or below the last committed round,
492        // or support reading from storage while limiting storage reads to edge cases.
493
494        let mut blocks = vec![];
495        for (_block_ref, block_info) in self.recent_blocks.range((
496            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
497            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
498        )) {
499            blocks.push(block_info.block.clone())
500        }
501        blocks
502    }
503
504    /// Gets all uncommitted blocks in a round.
505    /// Uncommitted blocks must exist in memory, so only in-memory blocks are checked.
506    pub(crate) fn get_uncommitted_blocks_at_round(&self, round: Round) -> Vec<VerifiedBlock> {
507        if round <= self.last_commit_round() {
508            panic!("Round {} have committed blocks!", round);
509        }
510
511        let mut blocks = vec![];
512        for (_block_ref, block_info) in self.recent_blocks.range((
513            Included(BlockRef::new(round, AuthorityIndex::ZERO, BlockDigest::MIN)),
514            Excluded(BlockRef::new(
515                round + 1,
516                AuthorityIndex::ZERO,
517                BlockDigest::MIN,
518            )),
519        )) {
520            blocks.push(block_info.block.clone())
521        }
522        blocks
523    }
524
525    #[cfg(test)]
526    pub(crate) fn get_block_children(&self, block_ref: &BlockRef) -> Option<Vec<BlockRef>> {
527        if block_ref.round <= self.gc_round() {
528            return None;
529        }
530        self.recent_blocks
531            .get(block_ref)
532            .map(|block_info| block_info.children.iter().cloned().collect())
533    }
534
535    #[cfg(test)]
536    pub(crate) fn get_block_children_authorities(
537        &self,
538        block_ref: &BlockRef,
539    ) -> Option<BTreeSet<AuthorityIndex>> {
540        if block_ref.round <= self.gc_round() {
541            return None;
542        }
543        self.recent_blocks
544            .get(block_ref)
545            .map(|block_info| block_info.children_authorities.clone())
546    }
547
548    #[cfg(test)]
549    pub(crate) fn get_block_total_children_stake(&self, block_ref: &BlockRef) -> Option<Stake> {
550        if block_ref.round <= self.gc_round() {
551            return None;
552        }
553        self.recent_blocks
554            .get(block_ref)
555            .map(|block_info| block_info.total_children_stake)
556    }
557
558    /// Gets all ancestors in the history of a block at a certain round.
559    pub(crate) fn ancestors_at_round(
560        &self,
561        later_block: &VerifiedBlock,
562        earlier_round: Round,
563    ) -> Vec<VerifiedBlock> {
564        // Iterate through ancestors of later_block in round descending order.
565        let mut linked: BTreeSet<BlockRef> = later_block.ancestors().iter().cloned().collect();
566        while !linked.is_empty() {
567            let round = linked.last().unwrap().round;
568            // Stop after finishing traversal for ancestors above earlier_round.
569            if round <= earlier_round {
570                break;
571            }
572            let block_ref = linked.pop_last().unwrap();
573            let Some(block) = self.get_block(&block_ref) else {
574                panic!("Block {:?} should exist in DAG!", block_ref);
575            };
576            linked.extend(block.ancestors().iter().cloned());
577        }
578        linked
579            .range((
580                Included(BlockRef::new(
581                    earlier_round,
582                    AuthorityIndex::ZERO,
583                    BlockDigest::MIN,
584                )),
585                Unbounded,
586            ))
587            .map(|r| {
588                self.get_block(r)
589                    .unwrap_or_else(|| panic!("Block {:?} should exist in DAG!", r))
590                    .clone()
591            })
592            .collect()
593    }
594
595    /// Gets the last proposed block from this authority.
596    /// If no block is proposed yet, returns the genesis block.
597    /// If the node is an observer, returns None.
598    pub(crate) fn get_last_proposed_block(&self) -> Option<VerifiedBlock> {
599        if self.context.is_validator() {
600            Some(self.get_last_block_for_authority(self.context.own_index))
601        } else {
602            None
603        }
604    }
605
606    /// Retrieves the last accepted block from the specified `authority`. If no block is found in cache
607    /// then the genesis block is returned as no other block has been received from that authority.
608    pub(crate) fn get_last_block_for_authority(&self, authority: AuthorityIndex) -> VerifiedBlock {
609        if let Some(last) = self.recent_refs_by_authority[authority].last() {
610            return self
611                .recent_blocks
612                .get(last)
613                .expect("Block should be found in recent blocks")
614                .block
615                .clone();
616        }
617
618        // if none exists, then fallback to genesis
619        let (_, genesis_block) = self
620            .genesis
621            .iter()
622            .find(|(block_ref, _)| block_ref.author == authority)
623            .expect("Genesis should be found for authority {authority_index}");
624        genesis_block.clone()
625    }
626
627    /// Returns cached recent blocks from the specified authority.
628    /// Blocks returned are limited to round >= `start`, and cached.
629    /// NOTE: caller should not assume returned blocks are always chained.
630    /// "Disconnected" blocks can be returned when there are byzantine blocks,
631    /// or a previously evicted block is accepted again.
632    pub(crate) fn get_cached_blocks(
633        &self,
634        authority: AuthorityIndex,
635        start: Round,
636    ) -> Vec<VerifiedBlock> {
637        self.get_cached_blocks_in_range(authority, start, Round::MAX, usize::MAX)
638    }
639
640    // Retrieves the cached block within the range [start_round, end_round) from a given authority,
641    // limited in total number of blocks.
642    pub(crate) fn get_cached_blocks_in_range(
643        &self,
644        authority: AuthorityIndex,
645        start_round: Round,
646        end_round: Round,
647        limit: usize,
648    ) -> Vec<VerifiedBlock> {
649        if start_round >= end_round || limit == 0 {
650            return vec![];
651        }
652
653        let mut blocks = vec![];
654        for block_ref in self.recent_refs_by_authority[authority].range((
655            Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
656            Excluded(BlockRef::new(
657                end_round,
658                AuthorityIndex::MIN,
659                BlockDigest::MIN,
660            )),
661        )) {
662            let block_info = self
663                .recent_blocks
664                .get(block_ref)
665                .expect("Block should exist in recent blocks");
666            blocks.push(block_info.block.clone());
667            if blocks.len() >= limit {
668                break;
669            }
670        }
671        blocks
672    }
673
674    // Retrieves the last cached block within the range [start_round, end_round) from a given authority.
675    pub(crate) fn get_last_cached_block_in_range(
676        &self,
677        authority: AuthorityIndex,
678        start_round: Round,
679        end_round: Round,
680    ) -> Option<VerifiedBlock> {
681        if start_round >= end_round {
682            return None;
683        }
684
685        let block_ref = self.recent_refs_by_authority[authority]
686            .range((
687                Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
688                Excluded(BlockRef::new(
689                    end_round,
690                    AuthorityIndex::MIN,
691                    BlockDigest::MIN,
692                )),
693            ))
694            .last()?;
695
696        self.recent_blocks
697            .get(block_ref)
698            .map(|block_info| block_info.block.clone())
699    }
700
701    /// Returns the last block proposed per authority with `evicted round < round < end_round`.
702    /// The method is guaranteed to return results only when the `end_round` is not earlier of the
703    /// available cached data for each authority (evicted round + 1), otherwise the method will panic.
704    /// It's the caller's responsibility to ensure that is not requesting for earlier rounds.
705    /// In case of equivocation for an authority's last slot, one block will be returned (the last in order)
706    /// and the other equivocating blocks will be returned.
707    pub(crate) fn get_last_cached_block_per_authority(
708        &self,
709        end_round: Round,
710    ) -> Vec<(VerifiedBlock, Vec<BlockRef>)> {
711        // Initialize with the genesis blocks as fallback
712        let mut blocks = self.genesis.values().cloned().collect::<Vec<_>>();
713        let mut equivocating_blocks = vec![vec![]; self.context.committee.size()];
714
715        if end_round == GENESIS_ROUND {
716            panic!(
717                "Attempted to retrieve blocks earlier than the genesis round which is not possible"
718            );
719        }
720
721        if end_round == GENESIS_ROUND + 1 {
722            return blocks.into_iter().map(|b| (b, vec![])).collect();
723        }
724
725        for (authority_index, block_refs) in self.recent_refs_by_authority.iter().enumerate() {
726            let authority_index = self
727                .context
728                .committee
729                .to_authority_index(authority_index)
730                .unwrap();
731
732            let last_evicted_round = self.evicted_rounds[authority_index];
733            if end_round.saturating_sub(1) <= last_evicted_round {
734                panic!(
735                    "Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}",
736                );
737            }
738
739            let block_ref_iter = block_refs
740                .range((
741                    Included(BlockRef::new(
742                        last_evicted_round + 1,
743                        authority_index,
744                        BlockDigest::MIN,
745                    )),
746                    Excluded(BlockRef::new(end_round, authority_index, BlockDigest::MIN)),
747                ))
748                .rev();
749
750            let mut last_round = 0;
751            for block_ref in block_ref_iter {
752                if last_round == 0 {
753                    last_round = block_ref.round;
754                    let block_info = self
755                        .recent_blocks
756                        .get(block_ref)
757                        .expect("Block should exist in recent blocks");
758                    blocks[authority_index] = block_info.block.clone();
759                    continue;
760                }
761                if block_ref.round < last_round {
762                    break;
763                }
764                equivocating_blocks[authority_index].push(*block_ref);
765            }
766        }
767
768        blocks
769            .into_iter()
770            .zip_debug_eq(equivocating_blocks)
771            .collect()
772    }
773
774    /// Checks whether a block exists in the slot. The method checks only against the cached data.
775    /// If the user asks for a slot that is not within the cached data then a panic is thrown.
776    pub(crate) fn contains_cached_block_at_slot(&self, slot: Slot) -> bool {
777        // Always return true for genesis slots.
778        if slot.round == GENESIS_ROUND {
779            return true;
780        }
781
782        let eviction_round = self.evicted_rounds[slot.authority];
783        if slot.round <= eviction_round {
784            panic!(
785                "{}",
786                format!(
787                    "Attempted to check for slot {slot} that is <= the last evicted round {eviction_round}"
788                )
789            );
790        }
791
792        let mut result = self.recent_refs_by_authority[slot.authority].range((
793            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
794            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
795        ));
796        result.next().is_some()
797    }
798
799    /// Checks whether the required blocks are in cache, if exist, or otherwise will check in store. The method is not caching
800    /// back the results, so its expensive if keep asking for cache missing blocks.
801    pub(crate) fn contains_blocks(&self, block_refs: Vec<BlockRef>) -> Vec<bool> {
802        let mut exist = vec![false; block_refs.len()];
803        let mut missing = Vec::new();
804
805        for (index, block_ref) in block_refs.into_iter().enumerate() {
806            let recent_refs = &self.recent_refs_by_authority[block_ref.author];
807            if recent_refs.contains(&block_ref) || self.genesis.contains_key(&block_ref) {
808                exist[index] = true;
809            } else if recent_refs.is_empty() || recent_refs.last().unwrap().round < block_ref.round
810            {
811                // Optimization: recent_refs contain the most recent blocks known to this authority.
812                // If a block ref is not found there and has a higher round, it definitely is
813                // missing from this authority and there is no need to check disk.
814                exist[index] = false;
815            } else {
816                missing.push((index, block_ref));
817            }
818        }
819
820        if missing.is_empty() {
821            return exist;
822        }
823
824        let missing_refs = missing
825            .iter()
826            .map(|(_, block_ref)| *block_ref)
827            .collect::<Vec<_>>();
828        let store_results = self
829            .store
830            .contains_blocks(&missing_refs)
831            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
832        self.context
833            .metrics
834            .node_metrics
835            .dag_state_store_read_count
836            .with_label_values(&["contains_blocks"])
837            .inc();
838
839        for ((index, _), result) in missing.into_iter().zip_debug_eq(store_results.into_iter()) {
840            exist[index] = result;
841        }
842
843        exist
844    }
845
846    pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool {
847        let blocks = self.contains_blocks(vec![*block_ref]);
848        blocks.first().cloned().unwrap()
849    }
850
851    // Sets the block as committed in the cache. If the block is set as committed for first time, then true is returned, otherwise false is returned instead.
852    // Method will panic if the block is not found in the cache.
853    pub(crate) fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
854        if let Some(block_info) = self.recent_blocks.get_mut(block_ref) {
855            if !block_info.committed {
856                block_info.committed = true;
857                return true;
858            }
859            false
860        } else {
861            panic!(
862                "Block {:?} not found in cache to set as committed.",
863                block_ref
864            );
865        }
866    }
867
868    /// Returns true if the block is committed. Only valid for blocks above the GC round.
869    pub(crate) fn is_committed(&self, block_ref: &BlockRef) -> bool {
870        self.recent_blocks
871            .get(block_ref)
872            .unwrap_or_else(|| panic!("Attempted to query for commit status for a block not in cached data {block_ref}"))
873            .committed
874    }
875
876    /// Recursively sets blocks in the causal history of the root block as hard linked, including the root block itself.
877    /// Returns the list of blocks that are newly linked.
878    /// The returned blocks are guaranteed to be above the GC round.
879    /// Transaction votes for the returned blocks are retrieved and carried by the upcoming
880    /// proposed block.
881    pub(crate) fn link_causal_history(&mut self, root_block: BlockRef) -> Vec<BlockRef> {
882        let gc_round = self.gc_round();
883        let mut linked_blocks = vec![];
884        let mut targets = VecDeque::new();
885        targets.push_back(root_block);
886        while let Some(block_ref) = targets.pop_front() {
887            // No need to collect or mark blocks at or below GC round.
888            // These blocks and their causal history will not be included in new commits.
889            // And their transactions do not need votes to finalize or skip.
890            //
891            // CommitFinalizer::gced_transaction_votes_for_pending_block() is the counterpart
892            // to this logic, when deciding if block A in the causal history of block B gets
893            // implicit accept transaction votes from block B.
894            if block_ref.round <= gc_round {
895                continue;
896            }
897            let block_info = self
898                .recent_blocks
899                .get_mut(&block_ref)
900                .unwrap_or_else(|| panic!("Block {:?} is not in DAG state", block_ref));
901            if block_info.included {
902                continue;
903            }
904            linked_blocks.push(block_ref);
905            block_info.included = true;
906            targets.extend(block_info.block.ancestors().iter());
907        }
908        linked_blocks
909    }
910
911    /// Returns true if the block has been included in an owned proposed block.
912    /// NOTE: caller should make sure only blocks above GC round are queried.
913    pub(crate) fn has_been_included(&self, block_ref: &BlockRef) -> bool {
914        self.recent_blocks
915            .get(block_ref)
916            .unwrap_or_else(|| {
917                panic!(
918                    "Attempted to query for inclusion status for a block not in cached data {}",
919                    block_ref
920                )
921            })
922            .included
923    }
924
925    pub(crate) fn threshold_clock_round(&self) -> Round {
926        self.threshold_clock.get_round()
927    }
928
929    // The timestamp of when quorum threshold was last reached in the threshold clock.
930    pub(crate) fn threshold_clock_quorum_ts(&self) -> Instant {
931        self.threshold_clock.get_quorum_ts()
932    }
933
934    pub(crate) fn highest_accepted_round(&self) -> Round {
935        self.highest_accepted_round
936    }
937
938    /// Returns the `RoundInfo` for `round`, if retained. `None` if the round has
939    /// been evicted, Gc'ed or has not been reached yet.
940    #[cfg(test)]
941    pub(crate) fn round_info(&self, round: Round) -> Option<&RoundInfo> {
942        let front_round = self.round_info.front()?.round;
943        if round < front_round || round <= self.gc_round() {
944            return None;
945        }
946        let round_info = self.round_info.get((round - front_round) as usize)?;
947        assert_eq!(
948            round_info.round, round,
949            "RoundInfo round {} does not match requested round {}. RoundInfo should be contiguous.",
950            round_info.round, round
951        );
952        Some(round_info)
953    }
954
955    /// Updates the `RoundInfo` for the round of the given block,
956    /// and creates a new `RoundInfo` if the block is in the next round.
957    fn update_round_info(&mut self, block: &VerifiedBlock) {
958        let block_ref = block.reference();
959
960        // RoundInfo is only kept for rounds above GC round.
961        let gc_round = self.gc_round();
962        if block.round() <= gc_round {
963            return;
964        }
965
966        // DAG can only grow one round at a time. Next round is at most 1 round after
967        // the current latest round.
968        let next_round = self
969            .round_info
970            .back()
971            .map(|info| info.round + 1)
972            // round_info starts empty on fresh startup or recovery.
973            .unwrap_or(gc_round + 1);
974        // DAG can only grow one round at a time.
975        assert!(
976            block.round() <= next_round,
977            "Attempted to update round info for block {block_ref} with round higher than next round {next_round}"
978        );
979        if block.round() == next_round {
980            self.round_info.push_back(RoundInfo {
981                round: block.round(),
982                authorities: BTreeSet::new(),
983                total_stake: 0,
984            });
985        }
986
987        // Update the RoundInfo of the block round.
988        let front_round = self
989            .round_info
990            .front()
991            .expect("round_info non-empty after extend")
992            .round;
993        let index = (block.round() - front_round) as usize;
994        let info = &mut self.round_info[index];
995        if info.authorities.insert(block_ref.author) {
996            info.total_stake += self.context.committee.stake(block_ref.author);
997        }
998    }
999
1000    // Buffers a new commit in memory and updates last committed rounds.
1001    // REQUIRED: must not skip over any commit index.
1002    pub(crate) fn add_commit(&mut self, commit: TrustedCommit) {
1003        let time_diff = if let Some(last_commit) = &self.last_commit {
1004            if commit.index() <= last_commit.index() {
1005                error!(
1006                    "New commit index {} <= last commit index {}!",
1007                    commit.index(),
1008                    last_commit.index()
1009                );
1010                return;
1011            }
1012            assert_eq!(commit.index(), last_commit.index() + 1);
1013
1014            if commit.timestamp_ms() < last_commit.timestamp_ms() {
1015                panic!(
1016                    "Commit timestamps do not monotonically increment, prev commit {:?}, new commit {:?}",
1017                    last_commit, commit
1018                );
1019            }
1020            commit
1021                .timestamp_ms()
1022                .saturating_sub(last_commit.timestamp_ms())
1023        } else {
1024            assert_eq!(commit.index(), 1);
1025            0
1026        };
1027
1028        self.context
1029            .metrics
1030            .node_metrics
1031            .last_commit_time_diff
1032            .observe(time_diff as f64);
1033
1034        let commit_round_advanced = if let Some(previous_commit) = &self.last_commit {
1035            previous_commit.round() < commit.round()
1036        } else {
1037            true
1038        };
1039
1040        self.last_commit = Some(commit.clone());
1041
1042        if commit_round_advanced {
1043            let now = std::time::Instant::now();
1044            if let Some(previous_time) = self.last_commit_round_advancement_time {
1045                self.context
1046                    .metrics
1047                    .node_metrics
1048                    .commit_round_advancement_interval
1049                    .observe(now.duration_since(previous_time).as_secs_f64())
1050            }
1051            self.last_commit_round_advancement_time = Some(now);
1052        }
1053
1054        for block_ref in commit.blocks().iter() {
1055            self.last_committed_rounds[block_ref.author] = max(
1056                self.last_committed_rounds[block_ref.author],
1057                block_ref.round,
1058            );
1059        }
1060
1061        for (i, round) in self.last_committed_rounds.iter().enumerate() {
1062            let index = self.context.committee.to_authority_index(i).unwrap();
1063            let hostname = &self.context.committee.authority(index).hostname;
1064            self.context
1065                .metrics
1066                .node_metrics
1067                .last_committed_authority_round
1068                .with_label_values(&[hostname])
1069                .set((*round).into());
1070        }
1071
1072        self.pending_commit_votes.push_back(commit.reference());
1073        self.commits_to_write.push(commit);
1074    }
1075
1076    /// Recovers commits to write from storage, at startup.
1077    pub(crate) fn recover_commits_to_write(&mut self, commits: Vec<TrustedCommit>) {
1078        self.commits_to_write.extend(commits);
1079    }
1080
1081    pub(crate) fn ensure_commits_to_write_is_empty(&self) {
1082        assert!(
1083            self.commits_to_write.is_empty(),
1084            "Commits to write should be empty. {:?}",
1085            self.commits_to_write,
1086        );
1087    }
1088
1089    pub(crate) fn add_commit_info(&mut self, reputation_scores: ReputationScores) {
1090        // We create an empty scoring subdag once reputation scores are calculated.
1091        // Note: It is okay for this to not be gated by protocol config as the
1092        // scoring_subdag should be empty in either case at this point.
1093        assert!(self.scoring_subdag.is_empty());
1094
1095        let commit_info = CommitInfo {
1096            committed_rounds: self.last_committed_rounds.clone(),
1097            reputation_scores,
1098        };
1099        let last_commit = self
1100            .last_commit
1101            .as_ref()
1102            .expect("Last commit should already be set.");
1103        self.commit_info_to_write
1104            .push((last_commit.reference(), commit_info));
1105    }
1106
1107    pub(crate) fn add_finalized_commit(
1108        &mut self,
1109        commit_ref: CommitRef,
1110        rejected_transactions: BTreeMap<BlockRef, Vec<TransactionIndex>>,
1111    ) {
1112        self.finalized_commits_to_write
1113            .push((commit_ref, rejected_transactions));
1114    }
1115
1116    pub(crate) fn take_commit_votes(&mut self, limit: usize) -> Vec<CommitVote> {
1117        let mut votes = Vec::new();
1118        while !self.pending_commit_votes.is_empty() && votes.len() < limit {
1119            votes.push(self.pending_commit_votes.pop_front().unwrap());
1120        }
1121        votes
1122    }
1123
1124    /// Index of the last commit.
1125    pub(crate) fn last_commit_index(&self) -> CommitIndex {
1126        match &self.last_commit {
1127            Some(commit) => commit.index(),
1128            None => 0,
1129        }
1130    }
1131
1132    /// Digest of the last commit.
1133    pub(crate) fn last_commit_digest(&self) -> CommitDigest {
1134        match &self.last_commit {
1135            Some(commit) => commit.digest(),
1136            None => CommitDigest::MIN,
1137        }
1138    }
1139
1140    /// Timestamp of the last commit.
1141    pub(crate) fn last_commit_timestamp_ms(&self) -> BlockTimestampMs {
1142        match &self.last_commit {
1143            Some(commit) => commit.timestamp_ms(),
1144            None => 0,
1145        }
1146    }
1147
1148    /// Leader slot of the last commit.
1149    pub(crate) fn last_commit_leader(&self) -> Slot {
1150        match &self.last_commit {
1151            Some(commit) => commit.leader().into(),
1152            None => self
1153                .genesis
1154                .iter()
1155                .next()
1156                .map(|(genesis_ref, _)| *genesis_ref)
1157                .expect("Genesis blocks should always be available.")
1158                .into(),
1159        }
1160    }
1161
1162    /// Highest round where a block is committed, which is last commit's leader round.
1163    pub(crate) fn last_commit_round(&self) -> Round {
1164        match &self.last_commit {
1165            Some(commit) => commit.leader().round,
1166            None => 0,
1167        }
1168    }
1169
1170    /// Last committed round per authority.
1171    pub(crate) fn last_committed_rounds(&self) -> Vec<Round> {
1172        self.last_committed_rounds.clone()
1173    }
1174
1175    /// The GC round is the highest round that blocks of equal or lower round are considered obsolete and no longer possible to be committed.
1176    /// There is no meaning accepting any blocks with round <= gc_round. The Garbage Collection (GC) round is calculated based on the latest
1177    /// committed leader round. When GC is disabled that will return the genesis round.
1178    pub(crate) fn gc_round(&self) -> Round {
1179        self.calculate_gc_round(self.last_commit_round())
1180    }
1181
1182    /// Calculates the GC round from the input leader round, which can be different
1183    /// from the last committed leader round.
1184    pub(crate) fn calculate_gc_round(&self, commit_round: Round) -> Round {
1185        commit_round.saturating_sub(self.context.protocol_config.gc_depth())
1186    }
1187
1188    /// Flushes unpersisted blocks, commits and commit info to storage.
1189    ///
1190    /// REQUIRED: when buffering a block, all of its ancestors and the latest commit which sets the GC round
1191    /// must also be buffered.
1192    /// REQUIRED: when buffering a commit, all of its included blocks and the previous commits must also be buffered.
1193    /// REQUIRED: when flushing, all of the buffered blocks and commits must be flushed together to ensure consistency.
1194    ///
1195    /// After each flush, DagState becomes persisted in storage and it expected to recover
1196    /// all internal states from storage after restarts.
1197    pub(crate) fn flush(&mut self) {
1198        let _s = self
1199            .context
1200            .metrics
1201            .node_metrics
1202            .scope_processing_time
1203            .with_label_values(&["DagState::flush"])
1204            .start_timer();
1205
1206        // Flush buffered data to storage.
1207        let pending_blocks = std::mem::take(&mut self.blocks_to_write);
1208        let pending_commits = std::mem::take(&mut self.commits_to_write);
1209        let pending_commit_info = std::mem::take(&mut self.commit_info_to_write);
1210        let pending_finalized_commits = std::mem::take(&mut self.finalized_commits_to_write);
1211        if pending_blocks.is_empty()
1212            && pending_commits.is_empty()
1213            && pending_commit_info.is_empty()
1214            && pending_finalized_commits.is_empty()
1215        {
1216            return;
1217        }
1218
1219        debug!(
1220            "Flushing {} blocks ({}), {} commits ({}), {} commit infos ({}), {} finalized commits ({}) to storage.",
1221            pending_blocks.len(),
1222            pending_blocks
1223                .iter()
1224                .map(|b| b.reference().to_string())
1225                .join(","),
1226            pending_commits.len(),
1227            pending_commits
1228                .iter()
1229                .map(|c| c.reference().to_string())
1230                .join(","),
1231            pending_commit_info.len(),
1232            pending_commit_info
1233                .iter()
1234                .map(|(commit_ref, _)| commit_ref.to_string())
1235                .join(","),
1236            pending_finalized_commits.len(),
1237            pending_finalized_commits
1238                .iter()
1239                .map(|(commit_ref, _)| commit_ref.to_string())
1240                .join(","),
1241        );
1242        self.store
1243            .write(WriteBatch::new(
1244                pending_blocks,
1245                pending_commits,
1246                pending_commit_info,
1247                pending_finalized_commits,
1248            ))
1249            .unwrap_or_else(|e| panic!("Failed to write to storage: {:?}", e));
1250        self.context
1251            .metrics
1252            .node_metrics
1253            .dag_state_store_write_count
1254            .inc();
1255
1256        // Clean up old cached data. After flushing, all cached blocks are guaranteed to be persisted.
1257        for (authority_index, _) in self.context.committee.authorities() {
1258            let eviction_round = self.calculate_authority_eviction_round(authority_index);
1259            while let Some(block_ref) = self.recent_refs_by_authority[authority_index].first() {
1260                if block_ref.round <= eviction_round {
1261                    self.recent_blocks.remove(block_ref);
1262                    self.recent_refs_by_authority[authority_index].pop_first();
1263                } else {
1264                    break;
1265                }
1266            }
1267            self.evicted_rounds[authority_index] = eviction_round;
1268        }
1269
1270        // Clean up old RoundInfo below gc_round.
1271        while let Some(info) = self.round_info.front() {
1272            if info.round <= self.gc_round() {
1273                self.round_info.pop_front();
1274            } else {
1275                break;
1276            }
1277        }
1278
1279        let metrics = &self.context.metrics.node_metrics;
1280        metrics
1281            .dag_state_recent_blocks
1282            .set(self.recent_blocks.len() as i64);
1283        metrics.dag_state_recent_refs.set(
1284            self.recent_refs_by_authority
1285                .iter()
1286                .map(BTreeSet::len)
1287                .sum::<usize>() as i64,
1288        );
1289    }
1290
1291    pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> {
1292        self.store
1293            .read_last_commit_info()
1294            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
1295    }
1296
1297    pub(crate) fn add_scoring_subdags(&mut self, scoring_subdags: Vec<CommittedSubDag>) {
1298        self.scoring_subdag.add_subdags(scoring_subdags);
1299    }
1300
1301    pub(crate) fn clear_scoring_subdag(&mut self) {
1302        self.scoring_subdag.clear();
1303    }
1304
1305    pub(crate) fn scoring_subdags_count(&self) -> usize {
1306        self.scoring_subdag.scored_subdags_count()
1307    }
1308
1309    pub(crate) fn calculate_scoring_subdag_scores(&self) -> ReputationScores {
1310        self.scoring_subdag.calculate_distributed_vote_scores()
1311    }
1312
1313    pub(crate) fn scoring_subdag_commit_range(&self) -> CommitIndex {
1314        self.scoring_subdag
1315            .commit_range
1316            .as_ref()
1317            .expect("commit range should exist for scoring subdag")
1318            .end()
1319    }
1320
1321    /// The last round that should get evicted after a cache clean up operation. After this round we are
1322    /// guaranteed to have all the produced blocks from that authority. For any round that is
1323    /// <= `last_evicted_round` we don't have such guarantees as out of order blocks might exist.
1324    fn calculate_authority_eviction_round(&self, authority_index: AuthorityIndex) -> Round {
1325        let last_round = self.recent_refs_by_authority[authority_index]
1326            .last()
1327            .map(|block_ref| block_ref.round)
1328            .unwrap_or(GENESIS_ROUND);
1329
1330        Self::eviction_round(last_round, self.gc_round(), self.cached_rounds)
1331    }
1332
1333    /// Calculates the eviction round for the given authority. The goal is to keep at least `cached_rounds`
1334    /// of the latest blocks in the cache (if enough data is available), while evicting blocks with rounds <= `gc_round` when possible.
1335    fn eviction_round(last_round: Round, gc_round: Round, cached_rounds: u32) -> Round {
1336        gc_round.min(last_round.saturating_sub(cached_rounds))
1337    }
1338
1339    /// Returns the underlying store.
1340    pub(crate) fn store(&self) -> Arc<dyn Store> {
1341        self.store.clone()
1342    }
1343
1344    /// Detects and returns the blocks of the round that forms the last quorum. The method will return
1345    /// the quorum even if that's genesis.
1346    #[cfg(test)]
1347    pub(crate) fn last_quorum(&self) -> Vec<VerifiedBlock> {
1348        // the quorum should exist either on the highest accepted round or the one before. If we fail to detect
1349        // a quorum then it means that our DAG has advanced with missing causal history.
1350        for round in
1351            (self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev()
1352        {
1353            if round == GENESIS_ROUND {
1354                return self.genesis_blocks();
1355            }
1356            use crate::stake_aggregator::{QuorumThreshold, StakeAggregator};
1357            let mut quorum = StakeAggregator::<QuorumThreshold>::new();
1358
1359            // Since the minimum wave length is 3 we expect to find a quorum in the uncommitted rounds.
1360            let blocks = self.get_uncommitted_blocks_at_round(round);
1361            for block in &blocks {
1362                if quorum.add(block.author(), &self.context.committee) {
1363                    return blocks;
1364                }
1365            }
1366        }
1367
1368        panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds.");
1369    }
1370
1371    #[cfg(test)]
1372    pub(crate) fn genesis_blocks(&self) -> Vec<VerifiedBlock> {
1373        self.genesis.values().cloned().collect()
1374    }
1375
1376    #[cfg(test)]
1377    pub(crate) fn set_last_commit(&mut self, commit: TrustedCommit) {
1378        self.last_commit = Some(commit);
1379    }
1380}
1381
1382struct BlockInfo {
1383    block: VerifiedBlock,
1384
1385    /// Used in computing commits and leader schedule in Mysticeti v3.
1386    /// Next-round blocks which have this block as an ancestor.
1387    children: BTreeSet<BlockRef>,
1388    /// Distinct authorities that have authored one of the entries in `children`.
1389    children_authorities: BTreeSet<AuthorityIndex>,
1390    /// Sum of stake across `children_authorities`.
1391    total_children_stake: Stake,
1392
1393    // Whether the block has been committed
1394    committed: bool,
1395    // Whether the block has been included in the causal history of an owned proposed block.
1396    ///
1397    /// There are two usages of this field:
1398    /// 1. When proposing blocks, determine the set of blocks to carry votes for.
1399    /// 2. When recovering, determine if a block has not been included in a proposed block and
1400    ///    should recover transaction votes by voting.
1401    included: bool,
1402}
1403
1404impl BlockInfo {
1405    fn new(block: VerifiedBlock) -> Self {
1406        Self {
1407            block,
1408            children: BTreeSet::new(),
1409            children_authorities: BTreeSet::new(),
1410            total_children_stake: 0,
1411            committed: false,
1412            included: false,
1413        }
1414    }
1415}
1416
1417/// Aggregates information about blocks accepted at a single round.
1418/// Used for commit generation and leader schedule calculation in Mysticeti v3.
1419/// RoundInfo is only kept for rounds above GC round.
1420pub(crate) struct RoundInfo {
1421    pub(crate) round: Round,
1422    /// Distinct authorities that have authored at least one accepted block in `round`.
1423    pub(crate) authorities: BTreeSet<AuthorityIndex>,
1424    /// Sum of stake across `authorities`.
1425    pub(crate) total_stake: Stake,
1426}
1427
1428#[cfg(test)]
1429mod test {
1430    use std::vec;
1431
1432    use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs};
1433    use parking_lot::RwLock;
1434
1435    use super::*;
1436    use crate::{
1437        block::{TestBlock, VerifiedBlock},
1438        storage::{WriteBatch, mem_store::MemStore},
1439        test_dag_builder::DagBuilder,
1440        test_dag_parser::parse_dag,
1441    };
1442
1443    #[tokio::test]
1444    async fn test_get_blocks() {
1445        let (context, _) = Context::new_for_test(4);
1446        let context = Arc::new(context);
1447        let store = Arc::new(MemStore::new());
1448        let mut dag_state = DagState::new(context.clone(), store.clone());
1449        let own_index = AuthorityIndex::new_for_test(0);
1450
1451        // Populate test blocks for round 1 ~ 10, authorities 0 ~ 2.
1452        let num_rounds: u32 = 10;
1453        let non_existent_round: u32 = 100;
1454        let num_authorities: u32 = 3;
1455        let num_blocks_per_slot: usize = 3;
1456        let mut blocks = BTreeMap::new();
1457        for round in 1..=num_rounds {
1458            for author in 0..num_authorities {
1459                // Create 3 blocks per slot, with different timestamps and digests.
1460                let base_ts = round as BlockTimestampMs * 1000;
1461                for timestamp in base_ts..base_ts + num_blocks_per_slot as u64 {
1462                    let block = VerifiedBlock::new_for_test(
1463                        TestBlock::new(round, author)
1464                            .set_timestamp_ms(timestamp)
1465                            .build(),
1466                    );
1467                    dag_state.accept_block(block.clone());
1468                    blocks.insert(block.reference(), block);
1469
1470                    // Only write one block per slot for own index
1471                    if AuthorityIndex::new_for_test(author) == own_index {
1472                        break;
1473                    }
1474                }
1475            }
1476        }
1477
1478        // Check uncommitted blocks that exist.
1479        for (r, block) in &blocks {
1480            assert_eq!(&dag_state.get_block(r).unwrap(), block);
1481        }
1482
1483        // Check uncommitted blocks that do not exist.
1484        let last_ref = blocks.keys().last().unwrap();
1485        assert!(
1486            dag_state
1487                .get_block(&BlockRef::new(
1488                    last_ref.round,
1489                    last_ref.author,
1490                    BlockDigest::MIN
1491                ))
1492                .is_none()
1493        );
1494
1495        // Check slots with uncommitted blocks.
1496        for round in 1..=num_rounds {
1497            for author in 0..num_authorities {
1498                let slot = Slot::new(
1499                    round,
1500                    context
1501                        .committee
1502                        .to_authority_index(author as usize)
1503                        .unwrap(),
1504                );
1505                let blocks = dag_state.get_uncommitted_blocks_at_slot(slot);
1506
1507                // We only write one block per slot for own index
1508                if AuthorityIndex::new_for_test(author) == own_index {
1509                    assert_eq!(blocks.len(), 1);
1510                } else {
1511                    assert_eq!(blocks.len(), num_blocks_per_slot);
1512                }
1513
1514                for b in blocks {
1515                    assert_eq!(b.round(), round);
1516                    assert_eq!(
1517                        b.author(),
1518                        context
1519                            .committee
1520                            .to_authority_index(author as usize)
1521                            .unwrap()
1522                    );
1523                }
1524            }
1525        }
1526
1527        // Check slots without uncommitted blocks.
1528        let slot = Slot::new(non_existent_round, AuthorityIndex::ZERO);
1529        assert!(dag_state.get_uncommitted_blocks_at_slot(slot).is_empty());
1530
1531        // Check rounds with uncommitted blocks.
1532        for round in 1..=num_rounds {
1533            let blocks = dag_state.get_uncommitted_blocks_at_round(round);
1534            // Expect 3 blocks per authority except for own authority which should
1535            // have 1 block.
1536            assert_eq!(
1537                blocks.len(),
1538                (num_authorities - 1) as usize * num_blocks_per_slot + 1
1539            );
1540            for b in blocks {
1541                assert_eq!(b.round(), round);
1542            }
1543        }
1544
1545        // Check rounds without uncommitted blocks.
1546        assert!(
1547            dag_state
1548                .get_uncommitted_blocks_at_round(non_existent_round)
1549                .is_empty()
1550        );
1551    }
1552
1553    #[tokio::test]
1554    async fn test_ancestors_at_uncommitted_round() {
1555        // Initialize DagState.
1556        let (context, _) = Context::new_for_test(4);
1557        let context = Arc::new(context);
1558        let store = Arc::new(MemStore::new());
1559        let mut dag_state = DagState::new(context.clone(), store.clone());
1560
1561        // Populate DagState.
1562
1563        // Round 10 refs will not have their blocks in DagState.
1564        let round_10_refs: Vec<_> = (0..4)
1565            .map(|a| {
1566                VerifiedBlock::new_for_test(TestBlock::new(10, a).set_timestamp_ms(1000).build())
1567                    .reference()
1568            })
1569            .collect();
1570
1571        // Round 11 blocks.
1572        let round_11 = [
1573            // This will connect to round 12.
1574            VerifiedBlock::new_for_test(
1575                TestBlock::new(11, 0)
1576                    .set_timestamp_ms(1100)
1577                    .set_ancestors(round_10_refs.clone())
1578                    .build(),
1579            ),
1580            // Slot(11, 1) has 3 blocks.
1581            // This will connect to round 12.
1582            VerifiedBlock::new_for_test(
1583                TestBlock::new(11, 1)
1584                    .set_timestamp_ms(1110)
1585                    .set_ancestors(round_10_refs.clone())
1586                    .build(),
1587            ),
1588            // This will connect to round 13.
1589            VerifiedBlock::new_for_test(
1590                TestBlock::new(11, 1)
1591                    .set_timestamp_ms(1111)
1592                    .set_ancestors(round_10_refs.clone())
1593                    .build(),
1594            ),
1595            // This will not connect to any block.
1596            VerifiedBlock::new_for_test(
1597                TestBlock::new(11, 1)
1598                    .set_timestamp_ms(1112)
1599                    .set_ancestors(round_10_refs.clone())
1600                    .build(),
1601            ),
1602            // This will not connect to any block.
1603            VerifiedBlock::new_for_test(
1604                TestBlock::new(11, 2)
1605                    .set_timestamp_ms(1120)
1606                    .set_ancestors(round_10_refs.clone())
1607                    .build(),
1608            ),
1609            // This will connect to round 12.
1610            VerifiedBlock::new_for_test(
1611                TestBlock::new(11, 3)
1612                    .set_timestamp_ms(1130)
1613                    .set_ancestors(round_10_refs.clone())
1614                    .build(),
1615            ),
1616        ];
1617
1618        // Round 12 blocks.
1619        let ancestors_for_round_12 = vec![
1620            round_11[0].reference(),
1621            round_11[1].reference(),
1622            round_11[5].reference(),
1623        ];
1624        let round_12 = [
1625            VerifiedBlock::new_for_test(
1626                TestBlock::new(12, 0)
1627                    .set_timestamp_ms(1200)
1628                    .set_ancestors(ancestors_for_round_12.clone())
1629                    .build(),
1630            ),
1631            VerifiedBlock::new_for_test(
1632                TestBlock::new(12, 2)
1633                    .set_timestamp_ms(1220)
1634                    .set_ancestors(ancestors_for_round_12.clone())
1635                    .build(),
1636            ),
1637            VerifiedBlock::new_for_test(
1638                TestBlock::new(12, 3)
1639                    .set_timestamp_ms(1230)
1640                    .set_ancestors(ancestors_for_round_12.clone())
1641                    .build(),
1642            ),
1643        ];
1644
1645        // Round 13 blocks.
1646        let ancestors_for_round_13 = vec![
1647            round_12[0].reference(),
1648            round_12[1].reference(),
1649            round_12[2].reference(),
1650            round_11[2].reference(),
1651        ];
1652        let round_13 = [
1653            VerifiedBlock::new_for_test(
1654                TestBlock::new(12, 1)
1655                    .set_timestamp_ms(1300)
1656                    .set_ancestors(ancestors_for_round_13.clone())
1657                    .build(),
1658            ),
1659            VerifiedBlock::new_for_test(
1660                TestBlock::new(12, 2)
1661                    .set_timestamp_ms(1320)
1662                    .set_ancestors(ancestors_for_round_13.clone())
1663                    .build(),
1664            ),
1665            VerifiedBlock::new_for_test(
1666                TestBlock::new(12, 3)
1667                    .set_timestamp_ms(1330)
1668                    .set_ancestors(ancestors_for_round_13.clone())
1669                    .build(),
1670            ),
1671        ];
1672
1673        // Round 14 anchor block.
1674        let ancestors_for_round_14 = round_13.iter().map(|b| b.reference()).collect();
1675        let anchor = VerifiedBlock::new_for_test(
1676            TestBlock::new(14, 1)
1677                .set_timestamp_ms(1410)
1678                .set_ancestors(ancestors_for_round_14)
1679                .build(),
1680        );
1681
1682        // Add all blocks (at and above round 11) to DagState.
1683        for b in round_11
1684            .iter()
1685            .chain(round_12.iter())
1686            .chain(round_13.iter())
1687            .chain([anchor.clone()].iter())
1688        {
1689            dag_state.accept_block(b.clone());
1690        }
1691
1692        // Check ancestors connected to anchor.
1693        let ancestors = dag_state.ancestors_at_round(&anchor, 11);
1694        let mut ancestors_refs: Vec<BlockRef> = ancestors.iter().map(|b| b.reference()).collect();
1695        ancestors_refs.sort();
1696        let mut expected_refs = vec![
1697            round_11[0].reference(),
1698            round_11[1].reference(),
1699            round_11[2].reference(),
1700            round_11[5].reference(),
1701        ];
1702        expected_refs.sort(); // we need to sort as blocks with same author and round of round 11 (position 1 & 2) might not be in right lexicographical order.
1703        assert_eq!(
1704            ancestors_refs, expected_refs,
1705            "Expected round 11 ancestors: {:?}. Got: {:?}",
1706            expected_refs, ancestors_refs
1707        );
1708    }
1709
1710    #[tokio::test]
1711    async fn test_link_causal_history() {
1712        let (mut context, _) = Context::new_for_test(4);
1713        context.parameters.dag_state_cached_rounds = 10;
1714        context.protocol_config.set_gc_depth_for_testing(3);
1715        let context = Arc::new(context);
1716
1717        let store = Arc::new(MemStore::new());
1718        let mut dag_state = DagState::new(context.clone(), store.clone());
1719
1720        // Create for rounds 1..=6. Skip creating blocks for authority 0 for rounds 4 - 6.
1721        let mut dag_builder = DagBuilder::new(context.clone());
1722        dag_builder.layers(1..=3).build();
1723        dag_builder
1724            .layers(4..=6)
1725            .authorities(vec![AuthorityIndex::new_for_test(0)])
1726            .skip_block()
1727            .build();
1728
1729        // Accept all blocks
1730        let all_blocks = dag_builder.all_blocks();
1731        dag_state.accept_blocks(all_blocks.clone());
1732
1733        // No block is linked yet.
1734        for block in &all_blocks {
1735            assert!(!dag_state.has_been_included(&block.reference()));
1736        }
1737
1738        // Link causal history from a round 1 block.
1739        let round_1_block = &all_blocks[1];
1740        assert_eq!(round_1_block.round(), 1);
1741        let linked_blocks = dag_state.link_causal_history(round_1_block.reference());
1742
1743        // Check that the block is linked.
1744        assert_eq!(linked_blocks.len(), 1);
1745        assert_eq!(linked_blocks[0], round_1_block.reference());
1746        for block_ref in linked_blocks {
1747            assert!(dag_state.has_been_included(&block_ref));
1748        }
1749
1750        // Link causal history from a round 2 block.
1751        let round_2_block = &all_blocks[4];
1752        assert_eq!(round_2_block.round(), 2);
1753        let linked_blocks = dag_state.link_causal_history(round_2_block.reference());
1754
1755        // Check the linked blocks.
1756        assert_eq!(linked_blocks.len(), 4);
1757        for block_ref in linked_blocks {
1758            assert!(block_ref == round_2_block.reference() || block_ref.round == 1);
1759        }
1760
1761        // Check linked status in dag state.
1762        for block in &all_blocks {
1763            if block.round() == 1 || block.reference() == round_2_block.reference() {
1764                assert!(dag_state.has_been_included(&block.reference()));
1765            } else {
1766                assert!(!dag_state.has_been_included(&block.reference()));
1767            }
1768        }
1769
1770        // Select round 6 block.
1771        let round_6_block = all_blocks.last().unwrap();
1772        assert_eq!(round_6_block.round(), 6);
1773
1774        // Get GC round to 3.
1775        let last_commit = TrustedCommit::new_for_test(
1776            6,
1777            CommitDigest::MIN,
1778            context.clock.timestamp_utc_ms(),
1779            round_6_block.reference(),
1780            vec![],
1781        );
1782        dag_state.set_last_commit(last_commit);
1783        assert_eq!(
1784            dag_state.gc_round(),
1785            3,
1786            "GC round should have moved to round 3"
1787        );
1788
1789        // Link causal history from a round 6 block.
1790        let linked_blocks = dag_state.link_causal_history(round_6_block.reference());
1791
1792        // Check the linked blocks. They should not include GC'ed blocks.
1793        assert_eq!(linked_blocks.len(), 7, "Linked blocks: {:?}", linked_blocks);
1794        for block_ref in linked_blocks {
1795            assert!(
1796                block_ref.round == 4
1797                    || block_ref.round == 5
1798                    || block_ref == round_6_block.reference()
1799            );
1800        }
1801
1802        // Check linked status in dag state.
1803        for block in &all_blocks {
1804            let block_ref = block.reference();
1805            if block.round() == 1
1806                || block_ref == round_2_block.reference()
1807                || block_ref.round == 4
1808                || block_ref.round == 5
1809                || block_ref == round_6_block.reference()
1810            {
1811                assert!(dag_state.has_been_included(&block.reference()));
1812            } else {
1813                assert!(!dag_state.has_been_included(&block.reference()));
1814            }
1815        }
1816    }
1817
1818    #[tokio::test]
1819    async fn test_block_children_basics() {
1820        let (mut context, _) = Context::new_for_test(4);
1821        // Small cached_rounds so a later flush actually evicts below-gc
1822        // entries from recent_blocks.
1823        context.parameters.dag_state_cached_rounds = 2;
1824        context.protocol_config.set_gc_depth_for_testing(3);
1825        context.protocol_config.set_enable_v3_for_testing(true);
1826        let context = Arc::new(context);
1827
1828        let store = Arc::new(MemStore::new());
1829        let mut dag_state = DagState::new(context.clone(), store.clone());
1830
1831        // Dense 4-authority DAG for rounds 1..=5 with default full links.
1832        let mut dag_builder = DagBuilder::new(context.clone());
1833        dag_builder.layers(1..=5).build();
1834
1835        let all_blocks = dag_builder.all_blocks();
1836        dag_state.accept_blocks(all_blocks.clone());
1837
1838        // Expected children: for each block, the round+1 blocks that reference it via a
1839        // parent (strong) link.
1840        let mut expected_children: BTreeMap<BlockRef, BTreeSet<BlockRef>> = BTreeMap::new();
1841        for block in &all_blocks {
1842            expected_children
1843                .entry(block.reference())
1844                .or_default()
1845                .extend(
1846                    all_blocks
1847                        .iter()
1848                        .filter(|b| b.round() == block.round() + 1)
1849                        .map(|b| b.reference()),
1850                );
1851        }
1852
1853        // Verify get_block_children() returns the expected set for every block.
1854        for block in &all_blocks {
1855            let block_ref = block.reference();
1856            let actual: BTreeSet<BlockRef> = dag_state
1857                .get_block_children(&block_ref)
1858                .expect("accepted block should be in recent_blocks")
1859                .into_iter()
1860                .collect();
1861            let want = expected_children.get(&block_ref).cloned().unwrap();
1862            assert_eq!(actual, want, "mismatched children for {block_ref:?}");
1863
1864            // Derived fields: distinct authors of the child set, and summed stake.
1865            let expected_authorities: BTreeSet<AuthorityIndex> =
1866                want.iter().map(|r| r.author).collect();
1867            let expected_stake: Stake = expected_authorities
1868                .iter()
1869                .map(|a| context.committee.stake(*a))
1870                .sum();
1871            assert_eq!(
1872                dag_state
1873                    .get_block_children_authorities(&block_ref)
1874                    .expect("accepted block should be in recent_blocks"),
1875                expected_authorities,
1876                "mismatched children_authorities for {block_ref:?}"
1877            );
1878            assert_eq!(
1879                dag_state
1880                    .get_block_total_children_stake(&block_ref)
1881                    .expect("accepted block should be in recent_blocks"),
1882                expected_stake,
1883                "mismatched total_children_stake for {block_ref:?}"
1884            );
1885        }
1886
1887        // Idempotence: re-accepting a block should be a no-op (via the
1888        // contains_block early-return) and must not change any parent's
1889        // children set.
1890        let round_2_block = all_blocks
1891            .iter()
1892            .find(|b| b.round() == 2)
1893            .expect("should have a round-2 block")
1894            .clone();
1895        let before: Vec<(BlockRef, BTreeSet<BlockRef>)> = round_2_block
1896            .ancestors()
1897            .iter()
1898            .map(|a| {
1899                (
1900                    *a,
1901                    dag_state
1902                        .get_block_children(a)
1903                        .unwrap()
1904                        .into_iter()
1905                        .collect(),
1906                )
1907            })
1908            .collect();
1909        dag_state.accept_block(round_2_block);
1910        for (ancestor, before_set) in before {
1911            let after: BTreeSet<BlockRef> = dag_state
1912                .get_block_children(&ancestor)
1913                .unwrap()
1914                .into_iter()
1915                .collect();
1916            assert_eq!(
1917                before_set, after,
1918                "children changed for {ancestor:?} after re-accept"
1919            );
1920        }
1921
1922        // GC boundary: commit a round-5 leader so gc_round advances to 2, then
1923        // flush to evict blocks with round <= eviction_round (= min(gc_round,
1924        // last_round - cached_rounds) = min(2, 5-2) = 2).
1925        let round_5_leader = all_blocks
1926            .last()
1927            .expect("last block should be round 5")
1928            .reference();
1929        let last_commit = TrustedCommit::new_for_test(
1930            5,
1931            CommitDigest::MIN,
1932            context.clock.timestamp_utc_ms(),
1933            round_5_leader,
1934            vec![],
1935        );
1936        dag_state.set_last_commit(last_commit);
1937        assert_eq!(dag_state.gc_round(), 2);
1938
1939        // Child metadata should expose logical retention immediately when the
1940        // GC round advances, even before flush physically removes stale entries.
1941        for block in all_blocks.iter().filter(|block| block.round() <= 2) {
1942            let block_ref = block.reference();
1943            assert!(
1944                dag_state.get_block_children(&block_ref).is_none(),
1945                "below-gc block {block_ref:?} should hide children before flush"
1946            );
1947            assert!(
1948                dag_state
1949                    .get_block_children_authorities(&block_ref)
1950                    .is_none(),
1951                "below-gc block {block_ref:?} should hide child authorities before flush"
1952            );
1953            assert!(
1954                dag_state
1955                    .get_block_total_children_stake(&block_ref)
1956                    .is_none(),
1957                "below-gc block {block_ref:?} should hide child stake before flush"
1958            );
1959        }
1960
1961        dag_state.flush();
1962
1963        // After flush: rounds 1..=2 evicted from recent_blocks → None.
1964        // Rounds 3..=4 retain their children sets; round 5 is above GC but has
1965        // no round-6 children.
1966        for block in &all_blocks {
1967            let block_ref = block.reference();
1968            match block.round() {
1969                1..=2 => assert!(
1970                    dag_state.get_block_children(&block_ref).is_none(),
1971                    "round {} block {block_ref:?} should be evicted after flush",
1972                    block.round()
1973                ),
1974                3..=4 => {
1975                    let actual: BTreeSet<BlockRef> = dag_state
1976                        .get_block_children(&block_ref)
1977                        .expect("above-gc block should remain")
1978                        .into_iter()
1979                        .collect();
1980                    let want = expected_children
1981                        .get(&block_ref)
1982                        .cloned()
1983                        .unwrap_or_default();
1984                    assert_eq!(
1985                        actual, want,
1986                        "children changed for {block_ref:?} after flush"
1987                    );
1988                    // Derived fields survive the flush since we only evict
1989                    // below-GC entries.
1990                    let expected_authorities: BTreeSet<AuthorityIndex> =
1991                        want.iter().map(|r| r.author).collect();
1992                    let expected_stake: Stake = expected_authorities
1993                        .iter()
1994                        .map(|a| context.committee.stake(*a))
1995                        .sum();
1996                    assert_eq!(
1997                        dag_state
1998                            .get_block_children_authorities(&block_ref)
1999                            .expect("above-gc block should remain"),
2000                        expected_authorities,
2001                        "children_authorities changed for {block_ref:?} after flush"
2002                    );
2003                    assert_eq!(
2004                        dag_state
2005                            .get_block_total_children_stake(&block_ref)
2006                            .expect("above-gc block should remain"),
2007                        expected_stake,
2008                        "total_children_stake changed for {block_ref:?} after flush"
2009                    );
2010                }
2011                5 => {
2012                    let actual = dag_state
2013                        .get_block_children(&block_ref)
2014                        .expect("round-5 block should remain");
2015                    assert!(
2016                        actual.is_empty(),
2017                        "round-5 block {block_ref:?} has unexpected children {actual:?}"
2018                    );
2019                    assert!(
2020                        dag_state
2021                            .get_block_children_authorities(&block_ref)
2022                            .expect("round-5 block should remain")
2023                            .is_empty(),
2024                        "round-5 block {block_ref:?} must have no children_authorities"
2025                    );
2026                    assert_eq!(
2027                        dag_state
2028                            .get_block_total_children_stake(&block_ref)
2029                            .expect("round-5 block should remain"),
2030                        0,
2031                        "round-5 block {block_ref:?} must have zero total_children_stake"
2032                    );
2033                }
2034                _ => unreachable!(),
2035            }
2036        }
2037    }
2038
2039    #[tokio::test]
2040    async fn test_block_children_exclusion() {
2041        let (mut context, _) = Context::new_for_test(4);
2042        context.parameters.dag_state_cached_rounds = 10;
2043        context.protocol_config.set_gc_depth_for_testing(3);
2044        context.protocol_config.set_enable_v3_for_testing(true);
2045        let context = Arc::new(context);
2046
2047        let store = Arc::new(MemStore::new());
2048        let mut dag_state = DagState::new(context.clone(), store.clone());
2049
2050        // Accept rounds 1..=2 with default full links.
2051        let mut dag_builder = DagBuilder::new(context.clone());
2052        dag_builder.layers(1..=2).build();
2053        let base_blocks = dag_builder.all_blocks();
2054        dag_state.accept_blocks(base_blocks.clone());
2055
2056        // Craft a round-3 block with mixed strong + weak ancestors:
2057        //   strong: three round-2 blocks (ancestor.round + 1 == 3, authority other than 0)
2058        //   weak:   one round-1 block (ancestor.round + 1 == 2, not 3, authority 0)
2059        let round_2_refs: Vec<BlockRef> = base_blocks
2060            .iter()
2061            .filter(|b| b.round() == 2 && b.author() != AuthorityIndex::new_for_test(0))
2062            .map(|b| b.reference())
2063            .collect();
2064        let weak_ancestor = base_blocks
2065            .iter()
2066            .find(|b| b.round() == 1 && b.author() == AuthorityIndex::new_for_test(0))
2067            .expect("should have a round-1 authority-0 block")
2068            .reference();
2069
2070        let mut ancestors = round_2_refs.clone();
2071        ancestors.push(weak_ancestor);
2072        let round_3 =
2073            VerifiedBlock::new_for_test(TestBlock::new(3, 1).set_ancestors_raw(ancestors).build());
2074        let round_3_ref = round_3.reference();
2075        dag_state.accept_block(round_3);
2076
2077        // Strong ancestors (round 2): each should now list round_3_ref as a child.
2078        // Derived fields: only authority 1 (round_3's author) is represented, so
2079        // children_authorities == {1} and total_children_stake == stake(1).
2080        let author_1 = AuthorityIndex::new_for_test(1);
2081        let stake_1 = context.committee.stake(author_1);
2082        for r2_ref in &round_2_refs {
2083            let children = dag_state
2084                .get_block_children(r2_ref)
2085                .expect("round-2 block should still be present");
2086            assert!(
2087                children.contains(&round_3_ref),
2088                "round-2 parent {r2_ref:?} should have round-3 block as child",
2089            );
2090            let authorities = dag_state
2091                .get_block_children_authorities(r2_ref)
2092                .expect("round-2 block should still be present");
2093            assert_eq!(
2094                authorities,
2095                BTreeSet::from([author_1]),
2096                "round-2 parent {r2_ref:?} should have children_authorities == {{1}}",
2097            );
2098            assert_eq!(
2099                dag_state
2100                    .get_block_total_children_stake(r2_ref)
2101                    .expect("round-2 block should still be present"),
2102                stake_1,
2103                "round-2 parent {r2_ref:?} total_children_stake mismatch",
2104            );
2105        }
2106
2107        // Weak ancestor (round 1): must NOT have round_3_ref as a child, and
2108        // its children set should only contain the natural round-2 population.
2109        let weak_children = dag_state
2110            .get_block_children(&weak_ancestor)
2111            .expect("weak ancestor should still be present");
2112        assert!(
2113            !weak_children.contains(&round_3_ref),
2114            "weak ancestor {weak_ancestor:?} must NOT have round-3 block as child",
2115        );
2116        for c in &weak_children {
2117            assert_eq!(
2118                c.round, 2,
2119                "weak ancestor's children should all be round 2, got {c:?}"
2120            );
2121        }
2122        // Derived fields for the weak ancestor: children are the 4 natural
2123        // round-2 blocks authored by all 4 authorities, so authorities =
2124        // {0,1,2,3} and stake = total_stake. round_3 does not contribute
2125        // because the weak link was filtered out.
2126        let weak_authorities = dag_state
2127            .get_block_children_authorities(&weak_ancestor)
2128            .expect("weak ancestor should still be present");
2129        let expected_weak_authorities: BTreeSet<AuthorityIndex> =
2130            (0..4).map(AuthorityIndex::new_for_test).collect();
2131        assert_eq!(
2132            weak_authorities, expected_weak_authorities,
2133            "weak ancestor {weak_ancestor:?} children_authorities should cover all 4 round-2 authors",
2134        );
2135        assert_eq!(
2136            dag_state
2137                .get_block_total_children_stake(&weak_ancestor)
2138                .expect("weak ancestor should still be present"),
2139            context.committee.total_stake(),
2140            "weak ancestor {weak_ancestor:?} total_children_stake should equal total committee stake",
2141        );
2142    }
2143
2144    #[tokio::test]
2145    async fn test_round_info() {
2146        let (mut context, _) = Context::new_for_test(4);
2147        // Small cached_rounds so a later flush evicts below-gc round_info entries.
2148        context.parameters.dag_state_cached_rounds = 2;
2149        context.protocol_config.set_gc_depth_for_testing(3);
2150        context.protocol_config.set_enable_v3_for_testing(true);
2151        let context = Arc::new(context);
2152
2153        let store = Arc::new(MemStore::new());
2154        let mut dag_state = DagState::new(context.clone(), store.clone());
2155
2156        // Before any blocks: no round_info entries exist.
2157        assert!(dag_state.round_info(1).is_none());
2158
2159        // Accept blocks for rounds 1..=4 with full participation. Each round
2160        // must aggregate every authority and stake equal to total_stake.
2161        let mut dag_builder = DagBuilder::new(context.clone());
2162        dag_builder.layers(1..=4).build();
2163        dag_state.accept_blocks(dag_builder.all_blocks());
2164
2165        let all_authorities: BTreeSet<AuthorityIndex> =
2166            (0..4).map(AuthorityIndex::new_for_test).collect();
2167        for round in 1..=4 {
2168            let info = dag_state
2169                .round_info(round)
2170                .unwrap_or_else(|| panic!("round_info missing for round {round}"));
2171            assert_eq!(info.round, round);
2172            assert_eq!(
2173                info.authorities, all_authorities,
2174                "round {round} authorities mismatch"
2175            );
2176            assert_eq!(
2177                info.total_stake,
2178                context.committee.total_stake(),
2179                "round {round} total_stake mismatch"
2180            );
2181        }
2182
2183        // Beyond highest_accepted_round: no entry yet.
2184        assert!(dag_state.round_info(5).is_none());
2185
2186        // Re-accepting the same blocks must not double-count stake. The
2187        // contains_block early-return inside accept_block guards this.
2188        dag_state.accept_blocks(dag_builder.all_blocks());
2189        for round in 1..=4 {
2190            let info = dag_state.round_info(round).unwrap();
2191            assert_eq!(
2192                info.authorities, all_authorities,
2193                "round {round} authorities changed after re-accept"
2194            );
2195            assert_eq!(
2196                info.total_stake,
2197                context.committee.total_stake(),
2198                "round {round} total_stake changed after re-accept"
2199            );
2200        }
2201
2202        // Partial round: extend the DAG with only authority 0 at round 5.
2203        let round_5_block = VerifiedBlock::new_for_test(
2204            TestBlock::new(5, 0)
2205                .set_ancestors_raw(
2206                    dag_builder
2207                        .all_blocks()
2208                        .iter()
2209                        .filter(|b| b.round() == 4)
2210                        .map(|b| b.reference())
2211                        .collect(),
2212                )
2213                .build(),
2214        );
2215        dag_state.accept_block(round_5_block);
2216        let info_5 = dag_state.round_info(5).expect("round 5 entry should exist");
2217        let author_0 = AuthorityIndex::new_for_test(0);
2218        assert_eq!(info_5.authorities, BTreeSet::from([author_0]));
2219        assert_eq!(info_5.total_stake, context.committee.stake(author_0));
2220
2221        // GC boundary: commit a round-5 leader so gc_round advances to 2,
2222        // then flush to evict round_info entries with round <= gc_round.
2223        let round_5_leader_ref = dag_state
2224            .recent_refs_by_authority
2225            .iter()
2226            .flat_map(|set| set.iter())
2227            .find(|r| r.round == 5)
2228            .copied()
2229            .expect("round 5 block should be accepted");
2230        let last_commit = TrustedCommit::new_for_test(
2231            5,
2232            CommitDigest::MIN,
2233            context.clock.timestamp_utc_ms(),
2234            round_5_leader_ref,
2235            vec![],
2236        );
2237        dag_state.set_last_commit(last_commit);
2238        assert_eq!(dag_state.gc_round(), 2);
2239
2240        // RoundInfo should expose logical retention immediately when the GC
2241        // round advances, even before flush physically removes stale entries.
2242        for round in 1..=2 {
2243            assert!(
2244                dag_state.round_info(round).is_none(),
2245                "round {round} should be hidden before flush after GC advances"
2246            );
2247        }
2248
2249        dag_state.flush();
2250
2251        // Rounds 1..=2 are evicted; rounds 3..=5 remain with their aggregates.
2252        for round in 1..=2 {
2253            assert!(
2254                dag_state.round_info(round).is_none(),
2255                "round {round} should be evicted after flush"
2256            );
2257        }
2258        for round in 3..=4 {
2259            let info = dag_state
2260                .round_info(round)
2261                .unwrap_or_else(|| panic!("round_info missing for round {round} after flush"));
2262            assert_eq!(info.authorities, all_authorities);
2263            assert_eq!(info.total_stake, context.committee.total_stake());
2264        }
2265        let info_5 = dag_state
2266            .round_info(5)
2267            .expect("round 5 should still be present after flush");
2268        assert_eq!(info_5.authorities, BTreeSet::from([author_0]));
2269        assert_eq!(info_5.total_stake, context.committee.stake(author_0));
2270    }
2271
2272    #[tokio::test]
2273    async fn test_contains_blocks_in_cache_or_store() {
2274        /// Only keep elements up to 2 rounds before the last committed round
2275        const CACHED_ROUNDS: Round = 2;
2276
2277        let (mut context, _) = Context::new_for_test(4);
2278        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2279
2280        let context = Arc::new(context);
2281        let store = Arc::new(MemStore::new());
2282        let mut dag_state = DagState::new(context.clone(), store.clone());
2283
2284        // Create test blocks for round 1 ~ 10
2285        let num_rounds: u32 = 10;
2286        let num_authorities: u32 = 4;
2287        let mut blocks = Vec::new();
2288
2289        for round in 1..=num_rounds {
2290            for author in 0..num_authorities {
2291                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2292                blocks.push(block);
2293            }
2294        }
2295
2296        // Now write in store the blocks from first 4 rounds and the rest to the dag state
2297        blocks.clone().into_iter().for_each(|block| {
2298            if block.round() <= 4 {
2299                store
2300                    .write(WriteBatch::default().blocks(vec![block]))
2301                    .unwrap();
2302            } else {
2303                dag_state.accept_blocks(vec![block]);
2304            }
2305        });
2306
2307        // Now when trying to query whether we have all the blocks, we should successfully retrieve a positive answer
2308        // where the blocks of first 4 round should be found in DagState and the rest in store.
2309        let mut block_refs = blocks
2310            .iter()
2311            .map(|block| block.reference())
2312            .collect::<Vec<_>>();
2313        let result = dag_state.contains_blocks(block_refs.clone());
2314
2315        // Ensure everything is found
2316        let mut expected = vec![true; (num_rounds * num_authorities) as usize];
2317        assert_eq!(result, expected);
2318
2319        // Now try to ask also for one block ref that is neither in cache nor in store
2320        block_refs.insert(
2321            3,
2322            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
2323        );
2324        let result = dag_state.contains_blocks(block_refs.clone());
2325
2326        // Then all should be found apart from the last one
2327        expected.insert(3, false);
2328        assert_eq!(result, expected.clone());
2329    }
2330
2331    #[tokio::test]
2332    async fn test_contains_cached_block_at_slot() {
2333        /// Only keep elements up to 2 rounds before the last committed round
2334        const CACHED_ROUNDS: Round = 2;
2335
2336        let num_authorities: u32 = 4;
2337        let (mut context, _) = Context::new_for_test(num_authorities as usize);
2338        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2339
2340        let context = Arc::new(context);
2341        let store = Arc::new(MemStore::new());
2342        let mut dag_state = DagState::new(context.clone(), store.clone());
2343
2344        // Create test blocks for round 1 ~ 10
2345        let num_rounds: u32 = 10;
2346        let mut blocks = Vec::new();
2347
2348        for round in 1..=num_rounds {
2349            for author in 0..num_authorities {
2350                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2351                blocks.push(block.clone());
2352                dag_state.accept_block(block);
2353            }
2354        }
2355
2356        // Query for genesis round 0, genesis blocks should be returned
2357        for (author, _) in context.committee.authorities() {
2358            assert!(
2359                dag_state.contains_cached_block_at_slot(Slot::new(GENESIS_ROUND, author)),
2360                "Genesis should always be found"
2361            );
2362        }
2363
2364        // Now when trying to query whether we have all the blocks, we should successfully retrieve a positive answer
2365        // where the blocks of first 4 round should be found in DagState and the rest in store.
2366        let mut block_refs = blocks
2367            .iter()
2368            .map(|block| block.reference())
2369            .collect::<Vec<_>>();
2370
2371        for block_ref in block_refs.clone() {
2372            let slot = block_ref.into();
2373            let found = dag_state.contains_cached_block_at_slot(slot);
2374            assert!(found, "A block should be found at slot {}", slot);
2375        }
2376
2377        // Now try to ask also for one block ref that is not in cache
2378        // Then all should be found apart from the last one
2379        block_refs.insert(
2380            3,
2381            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
2382        );
2383        let mut expected = vec![true; (num_rounds * num_authorities) as usize];
2384        expected.insert(3, false);
2385
2386        // Attempt to check the same for via the contains slot method
2387        for block_ref in block_refs {
2388            let slot = block_ref.into();
2389            let found = dag_state.contains_cached_block_at_slot(slot);
2390
2391            assert_eq!(expected.remove(0), found);
2392        }
2393    }
2394
2395    #[tokio::test]
2396    #[ignore]
2397    #[should_panic(
2398        expected = "Attempted to check for slot [1]3 that is <= the last gc evicted round 3"
2399    )]
2400    async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range() {
2401        /// Keep 2 rounds from the highest committed round. This is considered universal and minimum necessary blocks to hold
2402        /// for the correct node operation.
2403        const GC_DEPTH: u32 = 2;
2404        /// Keep at least 3 rounds in cache for each authority.
2405        const CACHED_ROUNDS: Round = 3;
2406
2407        let (mut context, _) = Context::new_for_test(4);
2408        context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
2409        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2410
2411        let context = Arc::new(context);
2412        let store = Arc::new(MemStore::new());
2413        let mut dag_state = DagState::new(context.clone(), store.clone());
2414
2415        // Create for rounds 1..=6. Skip creating blocks for authority 0 for rounds 4 - 6.
2416        let mut dag_builder = DagBuilder::new(context.clone());
2417        dag_builder.layers(1..=3).build();
2418        dag_builder
2419            .layers(4..=6)
2420            .authorities(vec![AuthorityIndex::new_for_test(0)])
2421            .skip_block()
2422            .build();
2423
2424        // Accept all blocks
2425        dag_builder
2426            .all_blocks()
2427            .into_iter()
2428            .for_each(|block| dag_state.accept_block(block));
2429
2430        // Now add a commit for leader round 5 to trigger an eviction
2431        dag_state.add_commit(TrustedCommit::new_for_test(
2432            1 as CommitIndex,
2433            CommitDigest::MIN,
2434            0,
2435            dag_builder.leader_block(5).unwrap().reference(),
2436            vec![],
2437        ));
2438        // Flush the DAG state to storage.
2439        dag_state.flush();
2440
2441        // Ensure that gc round has been updated
2442        assert_eq!(dag_state.gc_round(), 3, "GC round should be 3");
2443
2444        // Now what we expect to happen is for:
2445        // * Nodes 1 - 3 should have in cache blocks from gc_round (3) and onwards.
2446        // * Node 0 should have in cache blocks from it's latest round, 3, up to round 1, which is the number of cached_rounds.
2447        for authority_index in 1..=3 {
2448            for round in 4..=6 {
2449                assert!(dag_state.contains_cached_block_at_slot(Slot::new(
2450                    round,
2451                    AuthorityIndex::new_for_test(authority_index)
2452                )));
2453            }
2454        }
2455
2456        for round in 1..=3 {
2457            assert!(
2458                dag_state.contains_cached_block_at_slot(Slot::new(
2459                    round,
2460                    AuthorityIndex::new_for_test(0)
2461                ))
2462            );
2463        }
2464
2465        // When trying to request for authority 1 at block slot 3 it should panic, as anything
2466        // that is <= 3 should be evicted
2467        let _ =
2468            dag_state.contains_cached_block_at_slot(Slot::new(3, AuthorityIndex::new_for_test(1)));
2469    }
2470
2471    #[tokio::test]
2472    async fn test_get_blocks_in_cache_or_store() {
2473        let (context, _) = Context::new_for_test(4);
2474        let context = Arc::new(context);
2475        let store = Arc::new(MemStore::new());
2476        let mut dag_state = DagState::new(context.clone(), store.clone());
2477
2478        // Create test blocks for round 1 ~ 10
2479        let num_rounds: u32 = 10;
2480        let num_authorities: u32 = 4;
2481        let mut blocks = Vec::new();
2482
2483        for round in 1..=num_rounds {
2484            for author in 0..num_authorities {
2485                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2486                blocks.push(block);
2487            }
2488        }
2489
2490        // Now write in store the blocks from first 4 rounds and the rest to the dag state
2491        blocks.clone().into_iter().for_each(|block| {
2492            if block.round() <= 4 {
2493                store
2494                    .write(WriteBatch::default().blocks(vec![block]))
2495                    .unwrap();
2496            } else {
2497                dag_state.accept_blocks(vec![block]);
2498            }
2499        });
2500
2501        // Now when trying to query whether we have all the blocks, we should successfully retrieve a positive answer
2502        // where the blocks of first 4 round should be found in DagState and the rest in store.
2503        let mut block_refs = blocks
2504            .iter()
2505            .map(|block| block.reference())
2506            .collect::<Vec<_>>();
2507        let result = dag_state.get_blocks(&block_refs);
2508
2509        let mut expected = blocks
2510            .into_iter()
2511            .map(Some)
2512            .collect::<Vec<Option<VerifiedBlock>>>();
2513
2514        // Ensure everything is found
2515        assert_eq!(result, expected.clone());
2516
2517        // Now try to ask also for one block ref that is neither in cache nor in store
2518        block_refs.insert(
2519            3,
2520            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
2521        );
2522        let result = dag_state.get_blocks(&block_refs);
2523
2524        // Then all should be found apart from the last one
2525        expected.insert(3, None);
2526        assert_eq!(result, expected);
2527    }
2528
2529    #[tokio::test]
2530    async fn test_flush_and_recovery() {
2531        telemetry_subscribers::init_for_testing();
2532
2533        const GC_DEPTH: u32 = 3;
2534        const CACHED_ROUNDS: u32 = 4;
2535
2536        let num_authorities: u32 = 4;
2537        let (mut context, _) = Context::new_for_test(num_authorities as usize);
2538        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2539        context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
2540
2541        let context = Arc::new(context);
2542
2543        let store = Arc::new(MemStore::new());
2544        let mut dag_state = DagState::new(context.clone(), store.clone());
2545
2546        const NUM_ROUNDS: Round = 20;
2547        let mut dag_builder = DagBuilder::new(context.clone());
2548        dag_builder.layers(1..=5).build();
2549        dag_builder
2550            .layers(6..=8)
2551            .authorities(vec![AuthorityIndex::new_for_test(0)])
2552            .skip_block()
2553            .build();
2554        dag_builder.layers(9..=NUM_ROUNDS).build();
2555
2556        // Get all commits from the DAG builder.
2557        const LAST_COMMIT_ROUND: Round = 16;
2558        const LAST_COMMIT_INDEX: CommitIndex = 15;
2559        let commits = dag_builder
2560            .get_sub_dag_and_commits(1..=NUM_ROUNDS)
2561            .into_iter()
2562            .map(|(_subdag, commit)| commit)
2563            .take(LAST_COMMIT_INDEX as usize)
2564            .collect::<Vec<_>>();
2565        assert_eq!(commits.len(), LAST_COMMIT_INDEX as usize);
2566        assert_eq!(commits.last().unwrap().round(), LAST_COMMIT_ROUND);
2567
2568        // Add the blocks from first 11 rounds and first 8 commits to the dag state
2569        // Note that the commit of round 8 is missing because where authority 0 is the leader but produced no block.
2570        // So commit 8 has leader round 9.
2571        const PERSISTED_BLOCK_ROUNDS: u32 = 12;
2572        const NUM_PERSISTED_COMMITS: usize = 8;
2573        const LAST_PERSISTED_COMMIT_ROUND: Round = 9;
2574        const LAST_PERSISTED_COMMIT_INDEX: CommitIndex = 8;
2575        dag_state.accept_blocks(dag_builder.blocks(1..=PERSISTED_BLOCK_ROUNDS));
2576        let mut finalized_commits = vec![];
2577        for commit in commits.iter().take(NUM_PERSISTED_COMMITS).cloned() {
2578            finalized_commits.push(commit.clone());
2579            dag_state.add_commit(commit);
2580        }
2581        let last_finalized_commit = finalized_commits.last().unwrap();
2582        assert_eq!(last_finalized_commit.round(), LAST_PERSISTED_COMMIT_ROUND);
2583        assert_eq!(last_finalized_commit.index(), LAST_PERSISTED_COMMIT_INDEX);
2584
2585        // Collect finalized blocks.
2586        let finalized_blocks = finalized_commits
2587            .iter()
2588            .flat_map(|commit| commit.blocks())
2589            .collect::<BTreeSet<_>>();
2590
2591        // Flush commits from the dag state
2592        dag_state.flush();
2593
2594        // Verify the store has blocks up to round 12, and commits up to index 8.
2595        let store_blocks = store
2596            .scan_blocks_by_author(AuthorityIndex::new_for_test(1), 1)
2597            .unwrap();
2598        assert_eq!(store_blocks.last().unwrap().round(), PERSISTED_BLOCK_ROUNDS);
2599        let store_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
2600        assert_eq!(store_commits.len(), NUM_PERSISTED_COMMITS);
2601        assert_eq!(
2602            store_commits.last().unwrap().index(),
2603            LAST_PERSISTED_COMMIT_INDEX
2604        );
2605        assert_eq!(
2606            store_commits.last().unwrap().round(),
2607            LAST_PERSISTED_COMMIT_ROUND
2608        );
2609
2610        // Add the rest of the blocks and commits to the dag state
2611        dag_state.accept_blocks(dag_builder.blocks(PERSISTED_BLOCK_ROUNDS + 1..=NUM_ROUNDS));
2612        for commit in commits.iter().skip(NUM_PERSISTED_COMMITS).cloned() {
2613            dag_state.add_commit(commit);
2614        }
2615
2616        // All blocks should be found in DagState.
2617        let all_blocks = dag_builder.blocks(1..=NUM_ROUNDS);
2618        let block_refs = all_blocks
2619            .iter()
2620            .map(|block| block.reference())
2621            .collect::<Vec<_>>();
2622        let result = dag_state
2623            .get_blocks(&block_refs)
2624            .into_iter()
2625            .map(|b| b.unwrap())
2626            .collect::<Vec<_>>();
2627        assert_eq!(result, all_blocks);
2628
2629        // Last commit index from DagState should now be 15
2630        assert_eq!(dag_state.last_commit_index(), LAST_COMMIT_INDEX);
2631
2632        // Destroy the dag state without flushing additional data.
2633        drop(dag_state);
2634
2635        // Recover the state from the store
2636        let dag_state = DagState::new(context.clone(), store.clone());
2637
2638        // Persisted blocks rounds should be found in DagState.
2639        let all_blocks = dag_builder.blocks(1..=PERSISTED_BLOCK_ROUNDS);
2640        let block_refs = all_blocks
2641            .iter()
2642            .map(|block| block.reference())
2643            .collect::<Vec<_>>();
2644        let result = dag_state
2645            .get_blocks(&block_refs)
2646            .into_iter()
2647            .map(|b| b.unwrap())
2648            .collect::<Vec<_>>();
2649        assert_eq!(result, all_blocks);
2650
2651        // Unpersisted blocks should not be in DagState, because they are not flushed.
2652        let missing_blocks = dag_builder.blocks(PERSISTED_BLOCK_ROUNDS + 1..=NUM_ROUNDS);
2653        let block_refs = missing_blocks
2654            .iter()
2655            .map(|block| block.reference())
2656            .collect::<Vec<_>>();
2657        let retrieved_blocks = dag_state
2658            .get_blocks(&block_refs)
2659            .into_iter()
2660            .flatten()
2661            .collect::<Vec<_>>();
2662        assert!(retrieved_blocks.is_empty());
2663
2664        // Recovered last commit index and round should be 8 and 9.
2665        assert_eq!(dag_state.last_commit_index(), LAST_PERSISTED_COMMIT_INDEX);
2666        assert_eq!(dag_state.last_commit_round(), LAST_PERSISTED_COMMIT_ROUND);
2667
2668        // The last_commit_rounds of the finalized commits should have been recovered.
2669        let expected_last_committed_rounds = vec![5, 9, 8, 8];
2670        assert_eq!(
2671            dag_state.last_committed_rounds(),
2672            expected_last_committed_rounds
2673        );
2674        // Unscored subdags will be recovered based on the flushed commits and no commit info.
2675        assert_eq!(dag_state.scoring_subdags_count(), NUM_PERSISTED_COMMITS);
2676
2677        // Ensure that cached blocks exist only for specific rounds per authority
2678        for (authority_index, _) in context.committee.authorities() {
2679            let blocks = dag_state.get_cached_blocks(authority_index, 1);
2680
2681            // Ensure that eviction rounds have been properly recovered.
2682            // For every authority, the gc round is 9 - 3 = 6, and cached round is 12-5 = 7.
2683            // So eviction round is the min which is 6.
2684            if authority_index == AuthorityIndex::new_for_test(0) {
2685                assert_eq!(blocks.len(), 4);
2686                assert_eq!(dag_state.evicted_rounds[authority_index.value()], 6);
2687                assert!(
2688                    blocks
2689                        .into_iter()
2690                        .all(|block| block.round() >= 7 && block.round() <= 12)
2691                );
2692            } else {
2693                assert_eq!(blocks.len(), 6);
2694                assert_eq!(dag_state.evicted_rounds[authority_index.value()], 6);
2695                assert!(
2696                    blocks
2697                        .into_iter()
2698                        .all(|block| block.round() >= 7 && block.round() <= 12)
2699                );
2700            }
2701        }
2702
2703        // Ensure that committed blocks from > gc_round have been correctly recovered as committed according to committed sub dags.
2704        let gc_round = dag_state.gc_round();
2705        assert_eq!(gc_round, 6);
2706        dag_state
2707            .recent_blocks
2708            .iter()
2709            .for_each(|(block_ref, block_info)| {
2710                if block_ref.round > gc_round && finalized_blocks.contains(block_ref) {
2711                    assert!(
2712                        block_info.committed,
2713                        "Block {:?} should be set as committed",
2714                        block_ref
2715                    );
2716                }
2717            });
2718
2719        // Ensure the hard linked status of blocks are recovered.
2720        // All blocks below highest accepted round, or authority 0 round 12 block, should be hard linked.
2721        // Other blocks (round 12 but not from authority 0) should not be hard linked.
2722        // This is because authority 0 blocks are considered proposed blocks.
2723        dag_state
2724            .recent_blocks
2725            .iter()
2726            .for_each(|(block_ref, block_info)| {
2727                if block_ref.round < PERSISTED_BLOCK_ROUNDS || block_ref.author.value() == 0 {
2728                    assert!(block_info.included);
2729                } else {
2730                    assert!(!block_info.included);
2731                }
2732            });
2733    }
2734
2735    #[tokio::test]
2736    async fn test_block_info_as_committed() {
2737        let num_authorities: u32 = 4;
2738        let (context, _) = Context::new_for_test(num_authorities as usize);
2739        let context = Arc::new(context);
2740
2741        let store = Arc::new(MemStore::new());
2742        let mut dag_state = DagState::new(context.clone(), store.clone());
2743
2744        // Accept a block
2745        let block = VerifiedBlock::new_for_test(
2746            TestBlock::new(1, 0)
2747                .set_timestamp_ms(1000)
2748                .set_ancestors(vec![])
2749                .build(),
2750        );
2751
2752        dag_state.accept_block(block.clone());
2753
2754        // Query is committed
2755        assert!(!dag_state.is_committed(&block.reference()));
2756
2757        // Set block as committed for first time should return true
2758        assert!(
2759            dag_state.set_committed(&block.reference()),
2760            "Block should be successfully set as committed for first time"
2761        );
2762
2763        // Now it should appear as committed
2764        assert!(dag_state.is_committed(&block.reference()));
2765
2766        // Trying to set the block as committed again, it should return false.
2767        assert!(
2768            !dag_state.set_committed(&block.reference()),
2769            "Block should not be successfully set as committed"
2770        );
2771    }
2772
2773    #[tokio::test]
2774    async fn test_get_cached_blocks() {
2775        let (mut context, _) = Context::new_for_test(4);
2776        context.parameters.dag_state_cached_rounds = 5;
2777
2778        let context = Arc::new(context);
2779        let store = Arc::new(MemStore::new());
2780        let mut dag_state = DagState::new(context.clone(), store.clone());
2781
2782        // Create no blocks for authority 0
2783        // Create one block (round 10) for authority 1
2784        // Create two blocks (rounds 10,11) for authority 2
2785        // Create three blocks (rounds 10,11,12) for authority 3
2786        let mut all_blocks = Vec::new();
2787        for author in 1..=3 {
2788            for round in 10..(10 + author) {
2789                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2790                all_blocks.push(block.clone());
2791                dag_state.accept_block(block);
2792            }
2793        }
2794
2795        // Test get_cached_blocks()
2796
2797        let cached_blocks =
2798            dag_state.get_cached_blocks(context.committee.to_authority_index(0).unwrap(), 0);
2799        assert!(cached_blocks.is_empty());
2800
2801        let cached_blocks =
2802            dag_state.get_cached_blocks(context.committee.to_authority_index(1).unwrap(), 10);
2803        assert_eq!(cached_blocks.len(), 1);
2804        assert_eq!(cached_blocks[0].round(), 10);
2805
2806        let cached_blocks =
2807            dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 10);
2808        assert_eq!(cached_blocks.len(), 2);
2809        assert_eq!(cached_blocks[0].round(), 10);
2810        assert_eq!(cached_blocks[1].round(), 11);
2811
2812        let cached_blocks =
2813            dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 11);
2814        assert_eq!(cached_blocks.len(), 1);
2815        assert_eq!(cached_blocks[0].round(), 11);
2816
2817        let cached_blocks =
2818            dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 10);
2819        assert_eq!(cached_blocks.len(), 3);
2820        assert_eq!(cached_blocks[0].round(), 10);
2821        assert_eq!(cached_blocks[1].round(), 11);
2822        assert_eq!(cached_blocks[2].round(), 12);
2823
2824        let cached_blocks =
2825            dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 12);
2826        assert_eq!(cached_blocks.len(), 1);
2827        assert_eq!(cached_blocks[0].round(), 12);
2828
2829        // Test get_cached_blocks_in_range()
2830
2831        // Start == end
2832        let cached_blocks = dag_state.get_cached_blocks_in_range(
2833            context.committee.to_authority_index(3).unwrap(),
2834            10,
2835            10,
2836            1,
2837        );
2838        assert!(cached_blocks.is_empty());
2839
2840        // Start > end
2841        let cached_blocks = dag_state.get_cached_blocks_in_range(
2842            context.committee.to_authority_index(3).unwrap(),
2843            11,
2844            10,
2845            1,
2846        );
2847        assert!(cached_blocks.is_empty());
2848
2849        // Empty result.
2850        let cached_blocks = dag_state.get_cached_blocks_in_range(
2851            context.committee.to_authority_index(0).unwrap(),
2852            9,
2853            10,
2854            1,
2855        );
2856        assert!(cached_blocks.is_empty());
2857
2858        // Single block, one round before the end.
2859        let cached_blocks = dag_state.get_cached_blocks_in_range(
2860            context.committee.to_authority_index(1).unwrap(),
2861            9,
2862            11,
2863            1,
2864        );
2865        assert_eq!(cached_blocks.len(), 1);
2866        assert_eq!(cached_blocks[0].round(), 10);
2867
2868        // Respect end round.
2869        let cached_blocks = dag_state.get_cached_blocks_in_range(
2870            context.committee.to_authority_index(2).unwrap(),
2871            9,
2872            12,
2873            5,
2874        );
2875        assert_eq!(cached_blocks.len(), 2);
2876        assert_eq!(cached_blocks[0].round(), 10);
2877        assert_eq!(cached_blocks[1].round(), 11);
2878
2879        // Respect start round.
2880        let cached_blocks = dag_state.get_cached_blocks_in_range(
2881            context.committee.to_authority_index(3).unwrap(),
2882            11,
2883            20,
2884            5,
2885        );
2886        assert_eq!(cached_blocks.len(), 2);
2887        assert_eq!(cached_blocks[0].round(), 11);
2888        assert_eq!(cached_blocks[1].round(), 12);
2889
2890        // Respect limit
2891        let cached_blocks = dag_state.get_cached_blocks_in_range(
2892            context.committee.to_authority_index(3).unwrap(),
2893            10,
2894            20,
2895            1,
2896        );
2897        assert_eq!(cached_blocks.len(), 1);
2898        assert_eq!(cached_blocks[0].round(), 10);
2899    }
2900
2901    #[tokio::test]
2902    async fn test_get_last_cached_block() {
2903        // GIVEN
2904        const CACHED_ROUNDS: Round = 2;
2905        const GC_DEPTH: u32 = 1;
2906        let (mut context, _) = Context::new_for_test(4);
2907        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2908        context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
2909
2910        let context = Arc::new(context);
2911        let store = Arc::new(MemStore::new());
2912        let mut dag_state = DagState::new(context.clone(), store.clone());
2913
2914        // Create no blocks for authority 0
2915        // Create one block (round 1) for authority 1
2916        // Create two blocks (rounds 1,2) for authority 2
2917        // Create three blocks (rounds 1,2,3) for authority 3
2918        let dag_str = "DAG {
2919            Round 0 : { 4 },
2920            Round 1 : {
2921                B -> [*],
2922                C -> [*],
2923                D -> [*],
2924            },
2925            Round 2 : {
2926                C -> [*],
2927                D -> [*],
2928            },
2929            Round 3 : {
2930                D -> [*],
2931            },
2932        }";
2933
2934        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2935
2936        // Add equivocating block for round 2 authority 3
2937        let block = VerifiedBlock::new_for_test(TestBlock::new(2, 2).build());
2938
2939        // Accept all blocks
2940        for block in dag_builder
2941            .all_blocks()
2942            .into_iter()
2943            .chain(std::iter::once(block))
2944        {
2945            dag_state.accept_block(block);
2946        }
2947
2948        dag_state.add_commit(TrustedCommit::new_for_test(
2949            1 as CommitIndex,
2950            CommitDigest::MIN,
2951            context.clock.timestamp_utc_ms(),
2952            dag_builder.leader_block(3).unwrap().reference(),
2953            vec![],
2954        ));
2955
2956        // WHEN search for the latest blocks
2957        let end_round = 4;
2958        let expected_rounds = vec![0, 1, 2, 3];
2959        let expected_excluded_and_equivocating_blocks = vec![0, 0, 1, 0];
2960        // THEN
2961        let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2962        assert_eq!(
2963            last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2964            expected_rounds
2965        );
2966        assert_eq!(
2967            last_blocks.iter().map(|b| b.1.len()).collect::<Vec<_>>(),
2968            expected_excluded_and_equivocating_blocks
2969        );
2970
2971        // THEN
2972        for (i, expected_round) in expected_rounds.iter().enumerate() {
2973            let round = dag_state
2974                .get_last_cached_block_in_range(
2975                    context.committee.to_authority_index(i).unwrap(),
2976                    0,
2977                    end_round,
2978                )
2979                .map(|b| b.round())
2980                .unwrap_or_default();
2981            assert_eq!(round, *expected_round, "Authority {i}");
2982        }
2983
2984        // WHEN starting from round 2
2985        let start_round = 2;
2986        let expected_rounds = [0, 0, 2, 3];
2987
2988        // THEN
2989        for (i, expected_round) in expected_rounds.iter().enumerate() {
2990            let round = dag_state
2991                .get_last_cached_block_in_range(
2992                    context.committee.to_authority_index(i).unwrap(),
2993                    start_round,
2994                    end_round,
2995                )
2996                .map(|b| b.round())
2997                .unwrap_or_default();
2998            assert_eq!(round, *expected_round, "Authority {i}");
2999        }
3000
3001        // WHEN we flush the DagState - after adding a commit with all the blocks, we expect this to trigger
3002        // a clean up in the internal cache. That will keep the all the blocks with rounds >= authority_commit_round - CACHED_ROUND.
3003        //
3004        // When GC is enabled then we'll keep all the blocks that are > gc_round (2) and for those who don't have blocks > gc_round, we'll keep
3005        // all their highest round blocks for CACHED_ROUNDS.
3006        dag_state.flush();
3007
3008        // AND we request before round 3
3009        let end_round = 3;
3010        let expected_rounds = vec![0, 1, 2, 2];
3011
3012        // THEN
3013        let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
3014        assert_eq!(
3015            last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
3016            expected_rounds
3017        );
3018
3019        // THEN
3020        for (i, expected_round) in expected_rounds.iter().enumerate() {
3021            let round = dag_state
3022                .get_last_cached_block_in_range(
3023                    context.committee.to_authority_index(i).unwrap(),
3024                    0,
3025                    end_round,
3026                )
3027                .map(|b| b.round())
3028                .unwrap_or_default();
3029            assert_eq!(round, *expected_round, "Authority {i}");
3030        }
3031    }
3032
3033    #[tokio::test]
3034    #[should_panic(
3035        expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
3036    )]
3037    async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() {
3038        // GIVEN
3039        const CACHED_ROUNDS: Round = 1;
3040        const GC_DEPTH: u32 = 1;
3041        let (mut context, _) = Context::new_for_test(4);
3042        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
3043        context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
3044
3045        let context = Arc::new(context);
3046        let store = Arc::new(MemStore::new());
3047        let mut dag_state = DagState::new(context.clone(), store.clone());
3048
3049        // Create no blocks for authority 0
3050        // Create one block (round 1) for authority 1
3051        // Create two blocks (rounds 1,2) for authority 2
3052        // Create three blocks (rounds 1,2,3) for authority 3
3053        let mut dag_builder = DagBuilder::new(context.clone());
3054        dag_builder
3055            .layers(1..=1)
3056            .authorities(vec![AuthorityIndex::new_for_test(0)])
3057            .skip_block()
3058            .build();
3059        dag_builder
3060            .layers(2..=2)
3061            .authorities(vec![
3062                AuthorityIndex::new_for_test(0),
3063                AuthorityIndex::new_for_test(1),
3064            ])
3065            .skip_block()
3066            .build();
3067        dag_builder
3068            .layers(3..=3)
3069            .authorities(vec![
3070                AuthorityIndex::new_for_test(0),
3071                AuthorityIndex::new_for_test(1),
3072                AuthorityIndex::new_for_test(2),
3073            ])
3074            .skip_block()
3075            .build();
3076
3077        // Accept all blocks
3078        for block in dag_builder.all_blocks() {
3079            dag_state.accept_block(block);
3080        }
3081
3082        dag_state.add_commit(TrustedCommit::new_for_test(
3083            1 as CommitIndex,
3084            CommitDigest::MIN,
3085            0,
3086            dag_builder.leader_block(3).unwrap().reference(),
3087            vec![],
3088        ));
3089
3090        // Flush the store so we update the evict rounds
3091        dag_state.flush();
3092
3093        // THEN the method should panic, as some authorities have already evicted rounds <= round 2
3094        dag_state.get_last_cached_block_per_authority(2);
3095    }
3096
3097    #[tokio::test]
3098    async fn test_last_quorum() {
3099        // GIVEN
3100        let (context, _) = Context::new_for_test(4);
3101        let context = Arc::new(context);
3102        let store = Arc::new(MemStore::new());
3103        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3104
3105        // WHEN no blocks exist then genesis should be returned
3106        {
3107            let genesis = genesis_blocks(context.as_ref());
3108
3109            assert_eq!(dag_state.read().last_quorum(), genesis);
3110        }
3111
3112        // WHEN a fully connected DAG up to round 4 is created, then round 4 blocks should be returned as quorum
3113        {
3114            let mut dag_builder = DagBuilder::new(context.clone());
3115            dag_builder
3116                .layers(1..=4)
3117                .build()
3118                .persist_layers(dag_state.clone());
3119            let round_4_blocks: Vec<_> = dag_builder
3120                .blocks(4..=4)
3121                .into_iter()
3122                .map(|block| block.reference())
3123                .collect();
3124
3125            let last_quorum = dag_state.read().last_quorum();
3126
3127            assert_eq!(
3128                last_quorum
3129                    .into_iter()
3130                    .map(|block| block.reference())
3131                    .collect::<Vec<_>>(),
3132                round_4_blocks
3133            );
3134        }
3135
3136        // WHEN adding one more block at round 5, still round 4 should be returned as quorum
3137        {
3138            let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
3139            dag_state.write().accept_block(block);
3140
3141            let round_4_blocks = dag_state.read().get_uncommitted_blocks_at_round(4);
3142
3143            let last_quorum = dag_state.read().last_quorum();
3144
3145            assert_eq!(last_quorum, round_4_blocks);
3146        }
3147    }
3148
3149    #[tokio::test]
3150    async fn test_last_block_for_authority() {
3151        // GIVEN
3152        let (context, _) = Context::new_for_test(4);
3153        let context = Arc::new(context);
3154        let store = Arc::new(MemStore::new());
3155        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3156
3157        // WHEN no blocks exist then genesis should be returned
3158        {
3159            let genesis = genesis_blocks(context.as_ref());
3160            let my_genesis = genesis
3161                .into_iter()
3162                .find(|block| block.author() == context.own_index)
3163                .unwrap();
3164
3165            assert_eq!(dag_state.read().get_last_proposed_block(), Some(my_genesis));
3166        }
3167
3168        // WHEN adding some blocks for authorities, only the last ones should be returned
3169        {
3170            // add blocks up to round 4
3171            let mut dag_builder = DagBuilder::new(context.clone());
3172            dag_builder
3173                .layers(1..=4)
3174                .build()
3175                .persist_layers(dag_state.clone());
3176
3177            // add block 5 for authority 0
3178            let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
3179            dag_state.write().accept_block(block);
3180
3181            let block = dag_state
3182                .read()
3183                .get_last_block_for_authority(AuthorityIndex::new_for_test(0));
3184            assert_eq!(block.round(), 5);
3185
3186            for (authority_index, _) in context.committee.authorities() {
3187                let block = dag_state
3188                    .read()
3189                    .get_last_block_for_authority(authority_index);
3190
3191                if authority_index.value() == 0 {
3192                    assert_eq!(block.round(), 5);
3193                } else {
3194                    assert_eq!(block.round(), 4);
3195                }
3196            }
3197        }
3198    }
3199
3200    #[tokio::test]
3201    async fn test_accept_block_not_panics_when_timestamp_is_ahead_and_median_timestamp() {
3202        // GIVEN
3203        let (context, _) = Context::new_for_test(4);
3204        let context = Arc::new(context);
3205        let store = Arc::new(MemStore::new());
3206        let mut dag_state = DagState::new(context.clone(), store.clone());
3207
3208        // Set a timestamp for the block that is ahead of the current time
3209        let block_timestamp = context.clock.timestamp_utc_ms() + 5_000;
3210
3211        let block = VerifiedBlock::new_for_test(
3212            TestBlock::new(10, 0)
3213                .set_timestamp_ms(block_timestamp)
3214                .build(),
3215        );
3216
3217        // Try to accept the block - it should not panic
3218        dag_state.accept_block(block);
3219    }
3220
3221    #[tokio::test]
3222    async fn test_last_finalized_commit() {
3223        // GIVEN
3224        let (context, _) = Context::new_for_test(4);
3225        let context = Arc::new(context);
3226        let store = Arc::new(MemStore::new());
3227        let mut dag_state = DagState::new(context.clone(), store.clone());
3228
3229        // WHEN adding a finalized commit
3230        let commit_ref = CommitRef::new(1, CommitDigest::MIN);
3231        let rejected_transactions = BTreeMap::new();
3232        dag_state.add_finalized_commit(commit_ref, rejected_transactions.clone());
3233
3234        // THEN the commit should be added to the buffer
3235        assert_eq!(dag_state.finalized_commits_to_write.len(), 1);
3236        assert_eq!(
3237            dag_state.finalized_commits_to_write[0],
3238            (commit_ref, rejected_transactions.clone())
3239        );
3240
3241        // WHEN flushing the DAG state
3242        dag_state.flush();
3243
3244        // THEN the commit and rejected transactions should be written to storage
3245        let last_finalized_commit = store.read_last_finalized_commit().unwrap();
3246        assert_eq!(last_finalized_commit, Some(commit_ref));
3247        let stored_rejected_transactions = store
3248            .read_rejected_transactions(commit_ref)
3249            .unwrap()
3250            .unwrap();
3251        assert_eq!(stored_rejected_transactions, rejected_transactions);
3252    }
3253}