consensus_core/
dag_state.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    cmp::max,
6    collections::{BTreeMap, BTreeSet, VecDeque},
7    ops::Bound::{Excluded, Included, Unbounded},
8    panic,
9    sync::Arc,
10    time::Duration,
11    vec,
12};
13
14use consensus_config::AuthorityIndex;
15use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs, Round, TransactionIndex};
16use itertools::Itertools as _;
17use mysten_common::ZipDebugEqIteratorExt;
18use tokio::time::Instant;
19use tracing::{debug, error, info, trace};
20
21use crate::{
22    CommittedSubDag,
23    block::{BlockAPI, GENESIS_ROUND, Slot, VerifiedBlock, genesis_blocks},
24    commit::{
25        CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRef, CommitVote,
26        GENESIS_COMMIT_INDEX, TrustedCommit, load_committed_subdag_from_store,
27    },
28    context::Context,
29    leader_scoring::{ReputationScores, ScoringSubdag},
30    storage::{Store, WriteBatch},
31    threshold_clock::ThresholdClock,
32};
33
34/// DagState provides the API to write and read accepted blocks from the DAG.
35/// Only uncommitted and last committed blocks are cached in memory.
36/// The rest of blocks are stored on disk.
37/// Refs to cached blocks and additional refs are cached as well, to speed up existence checks.
38///
39/// Note: DagState should be wrapped with Arc<parking_lot::RwLock<_>>, to allow
40/// concurrent access from multiple components.
41pub struct DagState {
42    context: Arc<Context>,
43
44    // The genesis blocks
45    genesis: BTreeMap<BlockRef, VerifiedBlock>,
46
47    // Contains recent blocks within CACHED_ROUNDS from the last committed round per authority.
48    // Note: all uncommitted blocks are kept in memory.
49    //
50    // When GC is enabled, this map has a different semantic. It holds all the recent data for each authority making sure that it always have available
51    // CACHED_ROUNDS worth of data. The entries are evicted based on the latest GC round, however the eviction process will respect the CACHED_ROUNDS.
52    // For each authority, blocks are only evicted when their round is less than or equal to both `gc_round`, and `highest authority round - cached rounds`.
53    // This ensures that the GC requirements are respected (we never clean up any block above `gc_round`), and there are enough blocks cached.
54    recent_blocks: BTreeMap<BlockRef, BlockInfo>,
55
56    // Indexes recent block refs by their authorities.
57    // Vec position corresponds to the authority index.
58    recent_refs_by_authority: Vec<BTreeSet<BlockRef>>,
59
60    // Keeps track of the threshold clock for proposing blocks.
61    threshold_clock: ThresholdClock,
62
63    // Keeps track of the highest round that has been evicted for each authority. Any blocks that are of round <= evict_round
64    // should be considered evicted, and if any exist we should not consider the causauly complete in the order they appear.
65    // The `evicted_rounds` size should be the same as the committee size.
66    evicted_rounds: Vec<Round>,
67
68    // Highest round of blocks accepted.
69    highest_accepted_round: Round,
70
71    // Last consensus commit of the dag.
72    last_commit: Option<TrustedCommit>,
73
74    // Last wall time when commit round advanced. Does not persist across restarts.
75    last_commit_round_advancement_time: Option<std::time::Instant>,
76
77    // Last committed rounds per authority.
78    last_committed_rounds: Vec<Round>,
79
80    /// The committed subdags that have been scored but scores have not been used
81    /// for leader schedule yet.
82    scoring_subdag: ScoringSubdag,
83
84    // Commit votes pending to be included in new blocks.
85    // TODO: limit to 1st commit per round with multi-leader.
86    // TODO: recover unproposed pending commit votes at startup.
87    pending_commit_votes: VecDeque<CommitVote>,
88
89    // Blocks and commits must be buffered for persistence before they can be
90    // inserted into the local DAG or sent to output.
91    blocks_to_write: Vec<VerifiedBlock>,
92    commits_to_write: Vec<TrustedCommit>,
93
94    // Buffers the reputation scores & last_committed_rounds to be flushed with the
95    // next dag state flush. Not writing eagerly is okay because we can recover reputation scores
96    // & last_committed_rounds from the commits as needed.
97    commit_info_to_write: Vec<(CommitRef, CommitInfo)>,
98
99    // Buffers finalized commits and their rejected transactions to be written to storage.
100    finalized_commits_to_write: Vec<(CommitRef, BTreeMap<BlockRef, Vec<TransactionIndex>>)>,
101
102    // Persistent storage for blocks, commits and other consensus data.
103    store: Arc<dyn Store>,
104
105    // The number of cached rounds
106    cached_rounds: Round,
107}
108
109impl DagState {
110    /// Initializes DagState from storage.
111    pub fn new(context: Arc<Context>, store: Arc<dyn Store>) -> Self {
112        let cached_rounds = context.parameters.dag_state_cached_rounds as Round;
113        let num_authorities = context.committee.size();
114
115        let genesis = genesis_blocks(context.as_ref())
116            .into_iter()
117            .map(|block| (block.reference(), block))
118            .collect();
119
120        let threshold_clock = ThresholdClock::new(1, context.clone());
121
122        let last_commit = store
123            .read_last_commit()
124            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
125
126        let commit_info = store
127            .read_last_commit_info()
128            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
129        let (mut last_committed_rounds, commit_recovery_start_index) =
130            if let Some((commit_ref, commit_info)) = commit_info {
131                tracing::info!("Recovering committed state from {commit_ref} {commit_info:?}");
132                (commit_info.committed_rounds, commit_ref.index + 1)
133            } else {
134                tracing::info!("Found no stored CommitInfo to recover from");
135                (vec![0; num_authorities], GENESIS_COMMIT_INDEX + 1)
136            };
137
138        let mut unscored_committed_subdags = Vec::new();
139        let mut scoring_subdag = ScoringSubdag::new(context.clone());
140
141        if let Some(last_commit) = last_commit.as_ref() {
142            store
143                .scan_commits((commit_recovery_start_index..=last_commit.index()).into())
144                .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
145                .iter()
146                .for_each(|commit| {
147                    for block_ref in commit.blocks() {
148                        last_committed_rounds[block_ref.author] =
149                            max(last_committed_rounds[block_ref.author], block_ref.round);
150                    }
151                    let committed_subdag =
152                        load_committed_subdag_from_store(store.as_ref(), commit.clone(), vec![]);
153                    // We don't need to recover reputation scores for unscored_committed_subdags
154                    unscored_committed_subdags.push(committed_subdag);
155                });
156        }
157
158        tracing::info!(
159            "DagState was initialized with the following state: \
160            {last_commit:?}; {last_committed_rounds:?}; {} unscored committed subdags;",
161            unscored_committed_subdags.len()
162        );
163
164        scoring_subdag.add_subdags(std::mem::take(&mut unscored_committed_subdags));
165
166        let mut state = Self {
167            context: context.clone(),
168            genesis,
169            recent_blocks: BTreeMap::new(),
170            recent_refs_by_authority: vec![BTreeSet::new(); num_authorities],
171            threshold_clock,
172            highest_accepted_round: 0,
173            last_commit: last_commit.clone(),
174            last_commit_round_advancement_time: None,
175            last_committed_rounds: last_committed_rounds.clone(),
176            pending_commit_votes: VecDeque::new(),
177            blocks_to_write: vec![],
178            commits_to_write: vec![],
179            commit_info_to_write: vec![],
180            finalized_commits_to_write: vec![],
181            scoring_subdag,
182            store: store.clone(),
183            cached_rounds,
184            evicted_rounds: vec![0; num_authorities],
185        };
186
187        for (authority_index, _) in context.committee.authorities() {
188            let (blocks, eviction_round) = {
189                // Find the latest block for the authority to calculate the eviction round. Then we want to scan and load the blocks from the eviction round and onwards only.
190                // As reminder, the eviction round is taking into account the gc_round.
191                let last_block = state
192                    .store
193                    .scan_last_blocks_by_author(authority_index, 1, None)
194                    .expect("Database error");
195                let last_block_round = last_block
196                    .last()
197                    .map(|b| b.round())
198                    .unwrap_or(GENESIS_ROUND);
199
200                let eviction_round =
201                    Self::eviction_round(last_block_round, state.gc_round(), state.cached_rounds);
202                let blocks = state
203                    .store
204                    .scan_blocks_by_author(authority_index, eviction_round + 1)
205                    .expect("Database error");
206
207                (blocks, eviction_round)
208            };
209
210            state.evicted_rounds[authority_index] = eviction_round;
211
212            // Update the block metadata for the authority.
213            for block in &blocks {
214                state.update_block_metadata(block);
215            }
216
217            debug!(
218                "Recovered blocks {}: {:?}",
219                authority_index,
220                blocks
221                    .iter()
222                    .map(|b| b.reference())
223                    .collect::<Vec<BlockRef>>()
224            );
225        }
226
227        if let Some(last_commit) = last_commit {
228            let mut index = last_commit.index();
229            let gc_round = state.gc_round();
230            info!(
231                "Recovering block commit statuses from commit index {} and backwards until leader of round <= gc_round {:?}",
232                index, gc_round
233            );
234
235            loop {
236                let commits = store
237                    .scan_commits((index..=index).into())
238                    .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
239                let Some(commit) = commits.first() else {
240                    info!("Recovering finished up to index {index}, no more commits to recover");
241                    break;
242                };
243
244                // Check the commit leader round to see if it is within the gc_round. If it is not then we can stop the recovery process.
245                if gc_round > 0 && commit.leader().round <= gc_round {
246                    info!(
247                        "Recovering finished, reached commit leader round {} <= gc_round {}",
248                        commit.leader().round,
249                        gc_round
250                    );
251                    break;
252                }
253
254                commit.blocks().iter().filter(|b| b.round > gc_round).for_each(|block_ref|{
255                    debug!(
256                        "Setting block {:?} as committed based on commit {:?}",
257                        block_ref,
258                        commit.index()
259                    );
260                    assert!(state.set_committed(block_ref), "Attempted to set again a block {:?} as committed when recovering commit {:?}", block_ref, commit);
261                });
262
263                // All commits are indexed starting from 1, so one reach zero exit.
264                index = index.saturating_sub(1);
265                if index == 0 {
266                    break;
267                }
268            }
269        }
270
271        // Recover hard linked statuses for blocks within GC round.
272        let proposed_blocks = store
273            .scan_blocks_by_author(context.own_index, state.gc_round() + 1)
274            .expect("Database error");
275        for block in proposed_blocks {
276            state.link_causal_history(block.reference());
277        }
278
279        state
280    }
281
282    /// Accepts a block into DagState and keeps it in memory.
283    pub(crate) fn accept_block(&mut self, block: VerifiedBlock) {
284        assert_ne!(
285            block.round(),
286            0,
287            "Genesis block should not be accepted into DAG."
288        );
289
290        let block_ref = block.reference();
291        if self.contains_block(&block_ref) {
292            return;
293        }
294
295        let now = self.context.clock.timestamp_utc_ms();
296        if block.timestamp_ms() > now {
297            trace!(
298                "Block {:?} with timestamp {} is greater than local timestamp {}.",
299                block,
300                block.timestamp_ms(),
301                now,
302            );
303        }
304        let hostname = &self.context.committee.authority(block_ref.author).hostname;
305        self.context
306            .metrics
307            .node_metrics
308            .accepted_block_time_drift_ms
309            .with_label_values(&[hostname])
310            .inc_by(block.timestamp_ms().saturating_sub(now));
311
312        // TODO: Move this check to core
313        // Ensure we don't write multiple blocks per slot for our own index
314        if block_ref.author == self.context.own_index {
315            let existing_blocks = self.get_uncommitted_blocks_at_slot(block_ref.into());
316            if !self
317                .context
318                .parameters
319                .internal
320                .skip_equivocation_validation
321            {
322                assert!(
323                    existing_blocks.is_empty(),
324                    "Block Rejected! Attempted to add block {block:#?} to own slot where \
325                    block(s) {existing_blocks:#?} already exists."
326                );
327            }
328        }
329        self.update_block_metadata(&block);
330        self.blocks_to_write.push(block);
331        let source = if self.context.own_index == block_ref.author {
332            "own"
333        } else {
334            "others"
335        };
336        self.context
337            .metrics
338            .node_metrics
339            .accepted_blocks
340            .with_label_values(&[source])
341            .inc();
342    }
343
344    /// Updates internal metadata for a block.
345    fn update_block_metadata(&mut self, block: &VerifiedBlock) {
346        let block_ref = block.reference();
347        self.recent_blocks
348            .insert(block_ref, BlockInfo::new(block.clone()));
349        self.recent_refs_by_authority[block_ref.author].insert(block_ref);
350
351        if self.threshold_clock.add_block(block_ref) {
352            // Do not measure quorum delay when no local block is proposed in the round.
353            let last_proposed_block = self.get_last_proposed_block();
354            if last_proposed_block.round() == block_ref.round {
355                let quorum_delay_ms = self
356                    .context
357                    .clock
358                    .timestamp_utc_ms()
359                    .saturating_sub(self.get_last_proposed_block().timestamp_ms());
360                self.context
361                    .metrics
362                    .node_metrics
363                    .quorum_receive_latency
364                    .observe(Duration::from_millis(quorum_delay_ms).as_secs_f64());
365            }
366        }
367
368        self.highest_accepted_round = max(self.highest_accepted_round, block.round());
369        self.context
370            .metrics
371            .node_metrics
372            .highest_accepted_round
373            .set(self.highest_accepted_round as i64);
374
375        let highest_accepted_round_for_author = self.recent_refs_by_authority[block_ref.author]
376            .last()
377            .map(|block_ref| block_ref.round)
378            .expect("There should be by now at least one block ref");
379        let hostname = &self.context.committee.authority(block_ref.author).hostname;
380        self.context
381            .metrics
382            .node_metrics
383            .highest_accepted_authority_round
384            .with_label_values(&[hostname])
385            .set(highest_accepted_round_for_author as i64);
386    }
387
388    /// Accepts a blocks into DagState and keeps it in memory.
389    pub(crate) fn accept_blocks(&mut self, blocks: Vec<VerifiedBlock>) {
390        debug!(
391            "Accepting blocks: {}",
392            blocks.iter().map(|b| b.reference().to_string()).join(",")
393        );
394        for block in blocks {
395            self.accept_block(block);
396        }
397    }
398
399    /// Gets a block by checking cached recent blocks then storage.
400    /// Returns None when the block is not found.
401    pub(crate) fn get_block(&self, reference: &BlockRef) -> Option<VerifiedBlock> {
402        self.get_blocks(&[*reference])
403            .pop()
404            .expect("Exactly one element should be returned")
405    }
406
407    /// Gets blocks by checking genesis, cached recent blocks in memory, then storage.
408    /// An element is None when the corresponding block is not found.
409    pub(crate) fn get_blocks(&self, block_refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
410        if block_refs.is_empty() {
411            return vec![];
412        }
413
414        let mut blocks = vec![None; block_refs.len()];
415        let mut missing = Vec::new();
416
417        for (index, block_ref) in block_refs.iter().enumerate() {
418            if block_ref.round == GENESIS_ROUND {
419                // Allow the caller to handle the invalid genesis ancestor error.
420                if let Some(block) = self.genesis.get(block_ref) {
421                    blocks[index] = Some(block.clone());
422                }
423                continue;
424            }
425            if let Some(block_info) = self.recent_blocks.get(block_ref) {
426                blocks[index] = Some(block_info.block.clone());
427                continue;
428            }
429            missing.push((index, block_ref));
430        }
431
432        if missing.is_empty() {
433            return blocks;
434        }
435
436        let missing_refs = missing
437            .iter()
438            .map(|(_, block_ref)| **block_ref)
439            .collect::<Vec<_>>();
440        let store_results = self
441            .store
442            .read_blocks(&missing_refs)
443            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
444        self.context
445            .metrics
446            .node_metrics
447            .dag_state_store_read_count
448            .with_label_values(&["get_blocks"])
449            .inc();
450
451        for ((index, _), result) in missing.into_iter().zip_debug_eq(store_results.into_iter()) {
452            blocks[index] = result;
453        }
454
455        blocks
456    }
457
458    /// Gets all uncommitted blocks in a slot.
459    /// Uncommitted blocks must exist in memory, so only in-memory blocks are checked.
460    pub(crate) fn get_uncommitted_blocks_at_slot(&self, slot: Slot) -> Vec<VerifiedBlock> {
461        // TODO: either panic below when the slot is at or below the last committed round,
462        // or support reading from storage while limiting storage reads to edge cases.
463
464        let mut blocks = vec![];
465        for (_block_ref, block_info) in self.recent_blocks.range((
466            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
467            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
468        )) {
469            blocks.push(block_info.block.clone())
470        }
471        blocks
472    }
473
474    /// Gets all uncommitted blocks in a round.
475    /// Uncommitted blocks must exist in memory, so only in-memory blocks are checked.
476    pub(crate) fn get_uncommitted_blocks_at_round(&self, round: Round) -> Vec<VerifiedBlock> {
477        if round <= self.last_commit_round() {
478            panic!("Round {} have committed blocks!", round);
479        }
480
481        let mut blocks = vec![];
482        for (_block_ref, block_info) in self.recent_blocks.range((
483            Included(BlockRef::new(round, AuthorityIndex::ZERO, BlockDigest::MIN)),
484            Excluded(BlockRef::new(
485                round + 1,
486                AuthorityIndex::ZERO,
487                BlockDigest::MIN,
488            )),
489        )) {
490            blocks.push(block_info.block.clone())
491        }
492        blocks
493    }
494
495    /// Gets all ancestors in the history of a block at a certain round.
496    pub(crate) fn ancestors_at_round(
497        &self,
498        later_block: &VerifiedBlock,
499        earlier_round: Round,
500    ) -> Vec<VerifiedBlock> {
501        // Iterate through ancestors of later_block in round descending order.
502        let mut linked: BTreeSet<BlockRef> = later_block.ancestors().iter().cloned().collect();
503        while !linked.is_empty() {
504            let round = linked.last().unwrap().round;
505            // Stop after finishing traversal for ancestors above earlier_round.
506            if round <= earlier_round {
507                break;
508            }
509            let block_ref = linked.pop_last().unwrap();
510            let Some(block) = self.get_block(&block_ref) else {
511                panic!("Block {:?} should exist in DAG!", block_ref);
512            };
513            linked.extend(block.ancestors().iter().cloned());
514        }
515        linked
516            .range((
517                Included(BlockRef::new(
518                    earlier_round,
519                    AuthorityIndex::ZERO,
520                    BlockDigest::MIN,
521                )),
522                Unbounded,
523            ))
524            .map(|r| {
525                self.get_block(r)
526                    .unwrap_or_else(|| panic!("Block {:?} should exist in DAG!", r))
527                    .clone()
528            })
529            .collect()
530    }
531
532    /// Gets the last proposed block from this authority.
533    /// If no block is proposed yet, returns the genesis block.
534    pub(crate) fn get_last_proposed_block(&self) -> VerifiedBlock {
535        self.get_last_block_for_authority(self.context.own_index)
536    }
537
538    /// Retrieves the last accepted block from the specified `authority`. If no block is found in cache
539    /// then the genesis block is returned as no other block has been received from that authority.
540    pub(crate) fn get_last_block_for_authority(&self, authority: AuthorityIndex) -> VerifiedBlock {
541        if let Some(last) = self.recent_refs_by_authority[authority].last() {
542            return self
543                .recent_blocks
544                .get(last)
545                .expect("Block should be found in recent blocks")
546                .block
547                .clone();
548        }
549
550        // if none exists, then fallback to genesis
551        let (_, genesis_block) = self
552            .genesis
553            .iter()
554            .find(|(block_ref, _)| block_ref.author == authority)
555            .expect("Genesis should be found for authority {authority_index}");
556        genesis_block.clone()
557    }
558
559    /// Returns cached recent blocks from the specified authority.
560    /// Blocks returned are limited to round >= `start`, and cached.
561    /// NOTE: caller should not assume returned blocks are always chained.
562    /// "Disconnected" blocks can be returned when there are byzantine blocks,
563    /// or a previously evicted block is accepted again.
564    pub(crate) fn get_cached_blocks(
565        &self,
566        authority: AuthorityIndex,
567        start: Round,
568    ) -> Vec<VerifiedBlock> {
569        self.get_cached_blocks_in_range(authority, start, Round::MAX, usize::MAX)
570    }
571
572    // Retrieves the cached block within the range [start_round, end_round) from a given authority,
573    // limited in total number of blocks.
574    pub(crate) fn get_cached_blocks_in_range(
575        &self,
576        authority: AuthorityIndex,
577        start_round: Round,
578        end_round: Round,
579        limit: usize,
580    ) -> Vec<VerifiedBlock> {
581        if start_round >= end_round || limit == 0 {
582            return vec![];
583        }
584
585        let mut blocks = vec![];
586        for block_ref in self.recent_refs_by_authority[authority].range((
587            Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
588            Excluded(BlockRef::new(
589                end_round,
590                AuthorityIndex::MIN,
591                BlockDigest::MIN,
592            )),
593        )) {
594            let block_info = self
595                .recent_blocks
596                .get(block_ref)
597                .expect("Block should exist in recent blocks");
598            blocks.push(block_info.block.clone());
599            if blocks.len() >= limit {
600                break;
601            }
602        }
603        blocks
604    }
605
606    // Retrieves the last cached block within the range [start_round, end_round) from a given authority.
607    pub(crate) fn get_last_cached_block_in_range(
608        &self,
609        authority: AuthorityIndex,
610        start_round: Round,
611        end_round: Round,
612    ) -> Option<VerifiedBlock> {
613        if start_round >= end_round {
614            return None;
615        }
616
617        let block_ref = self.recent_refs_by_authority[authority]
618            .range((
619                Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
620                Excluded(BlockRef::new(
621                    end_round,
622                    AuthorityIndex::MIN,
623                    BlockDigest::MIN,
624                )),
625            ))
626            .last()?;
627
628        self.recent_blocks
629            .get(block_ref)
630            .map(|block_info| block_info.block.clone())
631    }
632
633    /// Returns the last block proposed per authority with `evicted round < round < end_round`.
634    /// The method is guaranteed to return results only when the `end_round` is not earlier of the
635    /// available cached data for each authority (evicted round + 1), otherwise the method will panic.
636    /// It's the caller's responsibility to ensure that is not requesting for earlier rounds.
637    /// In case of equivocation for an authority's last slot, one block will be returned (the last in order)
638    /// and the other equivocating blocks will be returned.
639    pub(crate) fn get_last_cached_block_per_authority(
640        &self,
641        end_round: Round,
642    ) -> Vec<(VerifiedBlock, Vec<BlockRef>)> {
643        // Initialize with the genesis blocks as fallback
644        let mut blocks = self.genesis.values().cloned().collect::<Vec<_>>();
645        let mut equivocating_blocks = vec![vec![]; self.context.committee.size()];
646
647        if end_round == GENESIS_ROUND {
648            panic!(
649                "Attempted to retrieve blocks earlier than the genesis round which is not possible"
650            );
651        }
652
653        if end_round == GENESIS_ROUND + 1 {
654            return blocks.into_iter().map(|b| (b, vec![])).collect();
655        }
656
657        for (authority_index, block_refs) in self.recent_refs_by_authority.iter().enumerate() {
658            let authority_index = self
659                .context
660                .committee
661                .to_authority_index(authority_index)
662                .unwrap();
663
664            let last_evicted_round = self.evicted_rounds[authority_index];
665            if end_round.saturating_sub(1) <= last_evicted_round {
666                panic!(
667                    "Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}",
668                );
669            }
670
671            let block_ref_iter = block_refs
672                .range((
673                    Included(BlockRef::new(
674                        last_evicted_round + 1,
675                        authority_index,
676                        BlockDigest::MIN,
677                    )),
678                    Excluded(BlockRef::new(end_round, authority_index, BlockDigest::MIN)),
679                ))
680                .rev();
681
682            let mut last_round = 0;
683            for block_ref in block_ref_iter {
684                if last_round == 0 {
685                    last_round = block_ref.round;
686                    let block_info = self
687                        .recent_blocks
688                        .get(block_ref)
689                        .expect("Block should exist in recent blocks");
690                    blocks[authority_index] = block_info.block.clone();
691                    continue;
692                }
693                if block_ref.round < last_round {
694                    break;
695                }
696                equivocating_blocks[authority_index].push(*block_ref);
697            }
698        }
699
700        blocks
701            .into_iter()
702            .zip_debug_eq(equivocating_blocks)
703            .collect()
704    }
705
706    /// Checks whether a block exists in the slot. The method checks only against the cached data.
707    /// If the user asks for a slot that is not within the cached data then a panic is thrown.
708    pub(crate) fn contains_cached_block_at_slot(&self, slot: Slot) -> bool {
709        // Always return true for genesis slots.
710        if slot.round == GENESIS_ROUND {
711            return true;
712        }
713
714        let eviction_round = self.evicted_rounds[slot.authority];
715        if slot.round <= eviction_round {
716            panic!(
717                "{}",
718                format!(
719                    "Attempted to check for slot {slot} that is <= the last evicted round {eviction_round}"
720                )
721            );
722        }
723
724        let mut result = self.recent_refs_by_authority[slot.authority].range((
725            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
726            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
727        ));
728        result.next().is_some()
729    }
730
731    /// Checks whether the required blocks are in cache, if exist, or otherwise will check in store. The method is not caching
732    /// back the results, so its expensive if keep asking for cache missing blocks.
733    pub(crate) fn contains_blocks(&self, block_refs: Vec<BlockRef>) -> Vec<bool> {
734        let mut exist = vec![false; block_refs.len()];
735        let mut missing = Vec::new();
736
737        for (index, block_ref) in block_refs.into_iter().enumerate() {
738            let recent_refs = &self.recent_refs_by_authority[block_ref.author];
739            if recent_refs.contains(&block_ref) || self.genesis.contains_key(&block_ref) {
740                exist[index] = true;
741            } else if recent_refs.is_empty() || recent_refs.last().unwrap().round < block_ref.round
742            {
743                // Optimization: recent_refs contain the most recent blocks known to this authority.
744                // If a block ref is not found there and has a higher round, it definitely is
745                // missing from this authority and there is no need to check disk.
746                exist[index] = false;
747            } else {
748                missing.push((index, block_ref));
749            }
750        }
751
752        if missing.is_empty() {
753            return exist;
754        }
755
756        let missing_refs = missing
757            .iter()
758            .map(|(_, block_ref)| *block_ref)
759            .collect::<Vec<_>>();
760        let store_results = self
761            .store
762            .contains_blocks(&missing_refs)
763            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
764        self.context
765            .metrics
766            .node_metrics
767            .dag_state_store_read_count
768            .with_label_values(&["contains_blocks"])
769            .inc();
770
771        for ((index, _), result) in missing.into_iter().zip_debug_eq(store_results.into_iter()) {
772            exist[index] = result;
773        }
774
775        exist
776    }
777
778    pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool {
779        let blocks = self.contains_blocks(vec![*block_ref]);
780        blocks.first().cloned().unwrap()
781    }
782
783    // Sets the block as committed in the cache. If the block is set as committed for first time, then true is returned, otherwise false is returned instead.
784    // Method will panic if the block is not found in the cache.
785    pub(crate) fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
786        if let Some(block_info) = self.recent_blocks.get_mut(block_ref) {
787            if !block_info.committed {
788                block_info.committed = true;
789                return true;
790            }
791            false
792        } else {
793            panic!(
794                "Block {:?} not found in cache to set as committed.",
795                block_ref
796            );
797        }
798    }
799
800    /// Returns true if the block is committed. Only valid for blocks above the GC round.
801    pub(crate) fn is_committed(&self, block_ref: &BlockRef) -> bool {
802        self.recent_blocks
803            .get(block_ref)
804            .unwrap_or_else(|| panic!("Attempted to query for commit status for a block not in cached data {block_ref}"))
805            .committed
806    }
807
808    /// Recursively sets blocks in the causal history of the root block as hard linked, including the root block itself.
809    /// Returns the list of blocks that are newly linked.
810    /// The returned blocks are guaranteed to be above the GC round.
811    /// Transaction votes for the returned blocks are retrieved and carried by the upcoming
812    /// proposed block.
813    pub(crate) fn link_causal_history(&mut self, root_block: BlockRef) -> Vec<BlockRef> {
814        let gc_round = self.gc_round();
815        let mut linked_blocks = vec![];
816        let mut targets = VecDeque::new();
817        targets.push_back(root_block);
818        while let Some(block_ref) = targets.pop_front() {
819            // No need to collect or mark blocks at or below GC round.
820            // These blocks and their causal history will not be included in new commits.
821            // And their transactions do not need votes to finalize or skip.
822            //
823            // CommitFinalizer::gced_transaction_votes_for_pending_block() is the counterpart
824            // to this logic, when deciding if block A in the causal history of block B gets
825            // implicit accept transaction votes from block B.
826            if block_ref.round <= gc_round {
827                continue;
828            }
829            let block_info = self
830                .recent_blocks
831                .get_mut(&block_ref)
832                .unwrap_or_else(|| panic!("Block {:?} is not in DAG state", block_ref));
833            if block_info.included {
834                continue;
835            }
836            linked_blocks.push(block_ref);
837            block_info.included = true;
838            targets.extend(block_info.block.ancestors().iter());
839        }
840        linked_blocks
841    }
842
843    /// Returns true if the block has been included in an owned proposed block.
844    /// NOTE: caller should make sure only blocks above GC round are queried.
845    pub(crate) fn has_been_included(&self, block_ref: &BlockRef) -> bool {
846        self.recent_blocks
847            .get(block_ref)
848            .unwrap_or_else(|| {
849                panic!(
850                    "Attempted to query for inclusion status for a block not in cached data {}",
851                    block_ref
852                )
853            })
854            .included
855    }
856
857    pub(crate) fn threshold_clock_round(&self) -> Round {
858        self.threshold_clock.get_round()
859    }
860
861    // The timestamp of when quorum threshold was last reached in the threshold clock.
862    pub(crate) fn threshold_clock_quorum_ts(&self) -> Instant {
863        self.threshold_clock.get_quorum_ts()
864    }
865
866    pub(crate) fn highest_accepted_round(&self) -> Round {
867        self.highest_accepted_round
868    }
869
870    // Buffers a new commit in memory and updates last committed rounds.
871    // REQUIRED: must not skip over any commit index.
872    pub(crate) fn add_commit(&mut self, commit: TrustedCommit) {
873        let time_diff = if let Some(last_commit) = &self.last_commit {
874            if commit.index() <= last_commit.index() {
875                error!(
876                    "New commit index {} <= last commit index {}!",
877                    commit.index(),
878                    last_commit.index()
879                );
880                return;
881            }
882            assert_eq!(commit.index(), last_commit.index() + 1);
883
884            if commit.timestamp_ms() < last_commit.timestamp_ms() {
885                panic!(
886                    "Commit timestamps do not monotonically increment, prev commit {:?}, new commit {:?}",
887                    last_commit, commit
888                );
889            }
890            commit
891                .timestamp_ms()
892                .saturating_sub(last_commit.timestamp_ms())
893        } else {
894            assert_eq!(commit.index(), 1);
895            0
896        };
897
898        self.context
899            .metrics
900            .node_metrics
901            .last_commit_time_diff
902            .observe(time_diff as f64);
903
904        let commit_round_advanced = if let Some(previous_commit) = &self.last_commit {
905            previous_commit.round() < commit.round()
906        } else {
907            true
908        };
909
910        self.last_commit = Some(commit.clone());
911
912        if commit_round_advanced {
913            let now = std::time::Instant::now();
914            if let Some(previous_time) = self.last_commit_round_advancement_time {
915                self.context
916                    .metrics
917                    .node_metrics
918                    .commit_round_advancement_interval
919                    .observe(now.duration_since(previous_time).as_secs_f64())
920            }
921            self.last_commit_round_advancement_time = Some(now);
922        }
923
924        for block_ref in commit.blocks().iter() {
925            self.last_committed_rounds[block_ref.author] = max(
926                self.last_committed_rounds[block_ref.author],
927                block_ref.round,
928            );
929        }
930
931        for (i, round) in self.last_committed_rounds.iter().enumerate() {
932            let index = self.context.committee.to_authority_index(i).unwrap();
933            let hostname = &self.context.committee.authority(index).hostname;
934            self.context
935                .metrics
936                .node_metrics
937                .last_committed_authority_round
938                .with_label_values(&[hostname])
939                .set((*round).into());
940        }
941
942        self.pending_commit_votes.push_back(commit.reference());
943        self.commits_to_write.push(commit);
944    }
945
946    /// Recovers commits to write from storage, at startup.
947    pub(crate) fn recover_commits_to_write(&mut self, commits: Vec<TrustedCommit>) {
948        self.commits_to_write.extend(commits);
949    }
950
951    pub(crate) fn ensure_commits_to_write_is_empty(&self) {
952        assert!(
953            self.commits_to_write.is_empty(),
954            "Commits to write should be empty. {:?}",
955            self.commits_to_write,
956        );
957    }
958
959    pub(crate) fn add_commit_info(&mut self, reputation_scores: ReputationScores) {
960        // We create an empty scoring subdag once reputation scores are calculated.
961        // Note: It is okay for this to not be gated by protocol config as the
962        // scoring_subdag should be empty in either case at this point.
963        assert!(self.scoring_subdag.is_empty());
964
965        let commit_info = CommitInfo {
966            committed_rounds: self.last_committed_rounds.clone(),
967            reputation_scores,
968        };
969        let last_commit = self
970            .last_commit
971            .as_ref()
972            .expect("Last commit should already be set.");
973        self.commit_info_to_write
974            .push((last_commit.reference(), commit_info));
975    }
976
977    pub(crate) fn add_finalized_commit(
978        &mut self,
979        commit_ref: CommitRef,
980        rejected_transactions: BTreeMap<BlockRef, Vec<TransactionIndex>>,
981    ) {
982        self.finalized_commits_to_write
983            .push((commit_ref, rejected_transactions));
984    }
985
986    pub(crate) fn take_commit_votes(&mut self, limit: usize) -> Vec<CommitVote> {
987        let mut votes = Vec::new();
988        while !self.pending_commit_votes.is_empty() && votes.len() < limit {
989            votes.push(self.pending_commit_votes.pop_front().unwrap());
990        }
991        votes
992    }
993
994    /// Index of the last commit.
995    pub(crate) fn last_commit_index(&self) -> CommitIndex {
996        match &self.last_commit {
997            Some(commit) => commit.index(),
998            None => 0,
999        }
1000    }
1001
1002    /// Digest of the last commit.
1003    pub(crate) fn last_commit_digest(&self) -> CommitDigest {
1004        match &self.last_commit {
1005            Some(commit) => commit.digest(),
1006            None => CommitDigest::MIN,
1007        }
1008    }
1009
1010    /// Timestamp of the last commit.
1011    pub(crate) fn last_commit_timestamp_ms(&self) -> BlockTimestampMs {
1012        match &self.last_commit {
1013            Some(commit) => commit.timestamp_ms(),
1014            None => 0,
1015        }
1016    }
1017
1018    /// Leader slot of the last commit.
1019    pub(crate) fn last_commit_leader(&self) -> Slot {
1020        match &self.last_commit {
1021            Some(commit) => commit.leader().into(),
1022            None => self
1023                .genesis
1024                .iter()
1025                .next()
1026                .map(|(genesis_ref, _)| *genesis_ref)
1027                .expect("Genesis blocks should always be available.")
1028                .into(),
1029        }
1030    }
1031
1032    /// Highest round where a block is committed, which is last commit's leader round.
1033    pub(crate) fn last_commit_round(&self) -> Round {
1034        match &self.last_commit {
1035            Some(commit) => commit.leader().round,
1036            None => 0,
1037        }
1038    }
1039
1040    /// Last committed round per authority.
1041    pub(crate) fn last_committed_rounds(&self) -> Vec<Round> {
1042        self.last_committed_rounds.clone()
1043    }
1044
1045    /// The GC round is the highest round that blocks of equal or lower round are considered obsolete and no longer possible to be committed.
1046    /// There is no meaning accepting any blocks with round <= gc_round. The Garbage Collection (GC) round is calculated based on the latest
1047    /// committed leader round. When GC is disabled that will return the genesis round.
1048    pub(crate) fn gc_round(&self) -> Round {
1049        self.calculate_gc_round(self.last_commit_round())
1050    }
1051
1052    /// Calculates the GC round from the input leader round, which can be different
1053    /// from the last committed leader round.
1054    pub(crate) fn calculate_gc_round(&self, commit_round: Round) -> Round {
1055        commit_round.saturating_sub(self.context.protocol_config.gc_depth())
1056    }
1057
1058    /// Flushes unpersisted blocks, commits and commit info to storage.
1059    ///
1060    /// REQUIRED: when buffering a block, all of its ancestors and the latest commit which sets the GC round
1061    /// must also be buffered.
1062    /// REQUIRED: when buffering a commit, all of its included blocks and the previous commits must also be buffered.
1063    /// REQUIRED: when flushing, all of the buffered blocks and commits must be flushed together to ensure consistency.
1064    ///
1065    /// After each flush, DagState becomes persisted in storage and it expected to recover
1066    /// all internal states from storage after restarts.
1067    pub(crate) fn flush(&mut self) {
1068        let _s = self
1069            .context
1070            .metrics
1071            .node_metrics
1072            .scope_processing_time
1073            .with_label_values(&["DagState::flush"])
1074            .start_timer();
1075
1076        // Flush buffered data to storage.
1077        let pending_blocks = std::mem::take(&mut self.blocks_to_write);
1078        let pending_commits = std::mem::take(&mut self.commits_to_write);
1079        let pending_commit_info = std::mem::take(&mut self.commit_info_to_write);
1080        let pending_finalized_commits = std::mem::take(&mut self.finalized_commits_to_write);
1081        if pending_blocks.is_empty()
1082            && pending_commits.is_empty()
1083            && pending_commit_info.is_empty()
1084            && pending_finalized_commits.is_empty()
1085        {
1086            return;
1087        }
1088
1089        debug!(
1090            "Flushing {} blocks ({}), {} commits ({}), {} commit infos ({}), {} finalized commits ({}) to storage.",
1091            pending_blocks.len(),
1092            pending_blocks
1093                .iter()
1094                .map(|b| b.reference().to_string())
1095                .join(","),
1096            pending_commits.len(),
1097            pending_commits
1098                .iter()
1099                .map(|c| c.reference().to_string())
1100                .join(","),
1101            pending_commit_info.len(),
1102            pending_commit_info
1103                .iter()
1104                .map(|(commit_ref, _)| commit_ref.to_string())
1105                .join(","),
1106            pending_finalized_commits.len(),
1107            pending_finalized_commits
1108                .iter()
1109                .map(|(commit_ref, _)| commit_ref.to_string())
1110                .join(","),
1111        );
1112        self.store
1113            .write(WriteBatch::new(
1114                pending_blocks,
1115                pending_commits,
1116                pending_commit_info,
1117                pending_finalized_commits,
1118            ))
1119            .unwrap_or_else(|e| panic!("Failed to write to storage: {:?}", e));
1120        self.context
1121            .metrics
1122            .node_metrics
1123            .dag_state_store_write_count
1124            .inc();
1125
1126        // Clean up old cached data. After flushing, all cached blocks are guaranteed to be persisted.
1127        for (authority_index, _) in self.context.committee.authorities() {
1128            let eviction_round = self.calculate_authority_eviction_round(authority_index);
1129            while let Some(block_ref) = self.recent_refs_by_authority[authority_index].first() {
1130                if block_ref.round <= eviction_round {
1131                    self.recent_blocks.remove(block_ref);
1132                    self.recent_refs_by_authority[authority_index].pop_first();
1133                } else {
1134                    break;
1135                }
1136            }
1137            self.evicted_rounds[authority_index] = eviction_round;
1138        }
1139
1140        let metrics = &self.context.metrics.node_metrics;
1141        metrics
1142            .dag_state_recent_blocks
1143            .set(self.recent_blocks.len() as i64);
1144        metrics.dag_state_recent_refs.set(
1145            self.recent_refs_by_authority
1146                .iter()
1147                .map(BTreeSet::len)
1148                .sum::<usize>() as i64,
1149        );
1150    }
1151
1152    pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> {
1153        self.store
1154            .read_last_commit_info()
1155            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
1156    }
1157
1158    pub(crate) fn add_scoring_subdags(&mut self, scoring_subdags: Vec<CommittedSubDag>) {
1159        self.scoring_subdag.add_subdags(scoring_subdags);
1160    }
1161
1162    pub(crate) fn clear_scoring_subdag(&mut self) {
1163        self.scoring_subdag.clear();
1164    }
1165
1166    pub(crate) fn scoring_subdags_count(&self) -> usize {
1167        self.scoring_subdag.scored_subdags_count()
1168    }
1169
1170    pub(crate) fn is_scoring_subdag_empty(&self) -> bool {
1171        self.scoring_subdag.is_empty()
1172    }
1173
1174    pub(crate) fn calculate_scoring_subdag_scores(&self) -> ReputationScores {
1175        self.scoring_subdag.calculate_distributed_vote_scores()
1176    }
1177
1178    pub(crate) fn scoring_subdag_commit_range(&self) -> CommitIndex {
1179        self.scoring_subdag
1180            .commit_range
1181            .as_ref()
1182            .expect("commit range should exist for scoring subdag")
1183            .end()
1184    }
1185
1186    /// The last round that should get evicted after a cache clean up operation. After this round we are
1187    /// guaranteed to have all the produced blocks from that authority. For any round that is
1188    /// <= `last_evicted_round` we don't have such guarantees as out of order blocks might exist.
1189    fn calculate_authority_eviction_round(&self, authority_index: AuthorityIndex) -> Round {
1190        let last_round = self.recent_refs_by_authority[authority_index]
1191            .last()
1192            .map(|block_ref| block_ref.round)
1193            .unwrap_or(GENESIS_ROUND);
1194
1195        Self::eviction_round(last_round, self.gc_round(), self.cached_rounds)
1196    }
1197
1198    /// Calculates the eviction round for the given authority. The goal is to keep at least `cached_rounds`
1199    /// of the latest blocks in the cache (if enough data is available), while evicting blocks with rounds <= `gc_round` when possible.
1200    fn eviction_round(last_round: Round, gc_round: Round, cached_rounds: u32) -> Round {
1201        gc_round.min(last_round.saturating_sub(cached_rounds))
1202    }
1203
1204    /// Returns the underlying store.
1205    pub(crate) fn store(&self) -> Arc<dyn Store> {
1206        self.store.clone()
1207    }
1208
1209    /// Detects and returns the blocks of the round that forms the last quorum. The method will return
1210    /// the quorum even if that's genesis.
1211    #[cfg(test)]
1212    pub(crate) fn last_quorum(&self) -> Vec<VerifiedBlock> {
1213        // the quorum should exist either on the highest accepted round or the one before. If we fail to detect
1214        // a quorum then it means that our DAG has advanced with missing causal history.
1215        for round in
1216            (self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev()
1217        {
1218            if round == GENESIS_ROUND {
1219                return self.genesis_blocks();
1220            }
1221            use crate::stake_aggregator::{QuorumThreshold, StakeAggregator};
1222            let mut quorum = StakeAggregator::<QuorumThreshold>::new();
1223
1224            // Since the minimum wave length is 3 we expect to find a quorum in the uncommitted rounds.
1225            let blocks = self.get_uncommitted_blocks_at_round(round);
1226            for block in &blocks {
1227                if quorum.add(block.author(), &self.context.committee) {
1228                    return blocks;
1229                }
1230            }
1231        }
1232
1233        panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds.");
1234    }
1235
1236    #[cfg(test)]
1237    pub(crate) fn genesis_blocks(&self) -> Vec<VerifiedBlock> {
1238        self.genesis.values().cloned().collect()
1239    }
1240
1241    #[cfg(test)]
1242    pub(crate) fn set_last_commit(&mut self, commit: TrustedCommit) {
1243        self.last_commit = Some(commit);
1244    }
1245}
1246
1247struct BlockInfo {
1248    block: VerifiedBlock,
1249    // Whether the block has been committed
1250    committed: bool,
1251    // Whether the block has been included in the causal history of an owned proposed block.
1252    ///
1253    /// There are two usages of this field:
1254    /// 1. When proposing blocks, determine the set of blocks to carry votes for.
1255    /// 2. When recovering, determine if a block has not been included in a proposed block and
1256    ///    should recover transaction votes by voting.
1257    included: bool,
1258}
1259
1260impl BlockInfo {
1261    fn new(block: VerifiedBlock) -> Self {
1262        Self {
1263            block,
1264            committed: false,
1265            included: false,
1266        }
1267    }
1268}
1269
1270#[cfg(test)]
1271mod test {
1272    use std::vec;
1273
1274    use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs};
1275    use parking_lot::RwLock;
1276
1277    use super::*;
1278    use crate::{
1279        block::{TestBlock, VerifiedBlock},
1280        storage::{WriteBatch, mem_store::MemStore},
1281        test_dag_builder::DagBuilder,
1282        test_dag_parser::parse_dag,
1283    };
1284
1285    #[tokio::test]
1286    async fn test_get_blocks() {
1287        let (context, _) = Context::new_for_test(4);
1288        let context = Arc::new(context);
1289        let store = Arc::new(MemStore::new());
1290        let mut dag_state = DagState::new(context.clone(), store.clone());
1291        let own_index = AuthorityIndex::new_for_test(0);
1292
1293        // Populate test blocks for round 1 ~ 10, authorities 0 ~ 2.
1294        let num_rounds: u32 = 10;
1295        let non_existent_round: u32 = 100;
1296        let num_authorities: u32 = 3;
1297        let num_blocks_per_slot: usize = 3;
1298        let mut blocks = BTreeMap::new();
1299        for round in 1..=num_rounds {
1300            for author in 0..num_authorities {
1301                // Create 3 blocks per slot, with different timestamps and digests.
1302                let base_ts = round as BlockTimestampMs * 1000;
1303                for timestamp in base_ts..base_ts + num_blocks_per_slot as u64 {
1304                    let block = VerifiedBlock::new_for_test(
1305                        TestBlock::new(round, author)
1306                            .set_timestamp_ms(timestamp)
1307                            .build(),
1308                    );
1309                    dag_state.accept_block(block.clone());
1310                    blocks.insert(block.reference(), block);
1311
1312                    // Only write one block per slot for own index
1313                    if AuthorityIndex::new_for_test(author) == own_index {
1314                        break;
1315                    }
1316                }
1317            }
1318        }
1319
1320        // Check uncommitted blocks that exist.
1321        for (r, block) in &blocks {
1322            assert_eq!(&dag_state.get_block(r).unwrap(), block);
1323        }
1324
1325        // Check uncommitted blocks that do not exist.
1326        let last_ref = blocks.keys().last().unwrap();
1327        assert!(
1328            dag_state
1329                .get_block(&BlockRef::new(
1330                    last_ref.round,
1331                    last_ref.author,
1332                    BlockDigest::MIN
1333                ))
1334                .is_none()
1335        );
1336
1337        // Check slots with uncommitted blocks.
1338        for round in 1..=num_rounds {
1339            for author in 0..num_authorities {
1340                let slot = Slot::new(
1341                    round,
1342                    context
1343                        .committee
1344                        .to_authority_index(author as usize)
1345                        .unwrap(),
1346                );
1347                let blocks = dag_state.get_uncommitted_blocks_at_slot(slot);
1348
1349                // We only write one block per slot for own index
1350                if AuthorityIndex::new_for_test(author) == own_index {
1351                    assert_eq!(blocks.len(), 1);
1352                } else {
1353                    assert_eq!(blocks.len(), num_blocks_per_slot);
1354                }
1355
1356                for b in blocks {
1357                    assert_eq!(b.round(), round);
1358                    assert_eq!(
1359                        b.author(),
1360                        context
1361                            .committee
1362                            .to_authority_index(author as usize)
1363                            .unwrap()
1364                    );
1365                }
1366            }
1367        }
1368
1369        // Check slots without uncommitted blocks.
1370        let slot = Slot::new(non_existent_round, AuthorityIndex::ZERO);
1371        assert!(dag_state.get_uncommitted_blocks_at_slot(slot).is_empty());
1372
1373        // Check rounds with uncommitted blocks.
1374        for round in 1..=num_rounds {
1375            let blocks = dag_state.get_uncommitted_blocks_at_round(round);
1376            // Expect 3 blocks per authority except for own authority which should
1377            // have 1 block.
1378            assert_eq!(
1379                blocks.len(),
1380                (num_authorities - 1) as usize * num_blocks_per_slot + 1
1381            );
1382            for b in blocks {
1383                assert_eq!(b.round(), round);
1384            }
1385        }
1386
1387        // Check rounds without uncommitted blocks.
1388        assert!(
1389            dag_state
1390                .get_uncommitted_blocks_at_round(non_existent_round)
1391                .is_empty()
1392        );
1393    }
1394
1395    #[tokio::test]
1396    async fn test_ancestors_at_uncommitted_round() {
1397        // Initialize DagState.
1398        let (context, _) = Context::new_for_test(4);
1399        let context = Arc::new(context);
1400        let store = Arc::new(MemStore::new());
1401        let mut dag_state = DagState::new(context.clone(), store.clone());
1402
1403        // Populate DagState.
1404
1405        // Round 10 refs will not have their blocks in DagState.
1406        let round_10_refs: Vec<_> = (0..4)
1407            .map(|a| {
1408                VerifiedBlock::new_for_test(TestBlock::new(10, a).set_timestamp_ms(1000).build())
1409                    .reference()
1410            })
1411            .collect();
1412
1413        // Round 11 blocks.
1414        let round_11 = [
1415            // This will connect to round 12.
1416            VerifiedBlock::new_for_test(
1417                TestBlock::new(11, 0)
1418                    .set_timestamp_ms(1100)
1419                    .set_ancestors(round_10_refs.clone())
1420                    .build(),
1421            ),
1422            // Slot(11, 1) has 3 blocks.
1423            // This will connect to round 12.
1424            VerifiedBlock::new_for_test(
1425                TestBlock::new(11, 1)
1426                    .set_timestamp_ms(1110)
1427                    .set_ancestors(round_10_refs.clone())
1428                    .build(),
1429            ),
1430            // This will connect to round 13.
1431            VerifiedBlock::new_for_test(
1432                TestBlock::new(11, 1)
1433                    .set_timestamp_ms(1111)
1434                    .set_ancestors(round_10_refs.clone())
1435                    .build(),
1436            ),
1437            // This will not connect to any block.
1438            VerifiedBlock::new_for_test(
1439                TestBlock::new(11, 1)
1440                    .set_timestamp_ms(1112)
1441                    .set_ancestors(round_10_refs.clone())
1442                    .build(),
1443            ),
1444            // This will not connect to any block.
1445            VerifiedBlock::new_for_test(
1446                TestBlock::new(11, 2)
1447                    .set_timestamp_ms(1120)
1448                    .set_ancestors(round_10_refs.clone())
1449                    .build(),
1450            ),
1451            // This will connect to round 12.
1452            VerifiedBlock::new_for_test(
1453                TestBlock::new(11, 3)
1454                    .set_timestamp_ms(1130)
1455                    .set_ancestors(round_10_refs.clone())
1456                    .build(),
1457            ),
1458        ];
1459
1460        // Round 12 blocks.
1461        let ancestors_for_round_12 = vec![
1462            round_11[0].reference(),
1463            round_11[1].reference(),
1464            round_11[5].reference(),
1465        ];
1466        let round_12 = [
1467            VerifiedBlock::new_for_test(
1468                TestBlock::new(12, 0)
1469                    .set_timestamp_ms(1200)
1470                    .set_ancestors(ancestors_for_round_12.clone())
1471                    .build(),
1472            ),
1473            VerifiedBlock::new_for_test(
1474                TestBlock::new(12, 2)
1475                    .set_timestamp_ms(1220)
1476                    .set_ancestors(ancestors_for_round_12.clone())
1477                    .build(),
1478            ),
1479            VerifiedBlock::new_for_test(
1480                TestBlock::new(12, 3)
1481                    .set_timestamp_ms(1230)
1482                    .set_ancestors(ancestors_for_round_12.clone())
1483                    .build(),
1484            ),
1485        ];
1486
1487        // Round 13 blocks.
1488        let ancestors_for_round_13 = vec![
1489            round_12[0].reference(),
1490            round_12[1].reference(),
1491            round_12[2].reference(),
1492            round_11[2].reference(),
1493        ];
1494        let round_13 = [
1495            VerifiedBlock::new_for_test(
1496                TestBlock::new(12, 1)
1497                    .set_timestamp_ms(1300)
1498                    .set_ancestors(ancestors_for_round_13.clone())
1499                    .build(),
1500            ),
1501            VerifiedBlock::new_for_test(
1502                TestBlock::new(12, 2)
1503                    .set_timestamp_ms(1320)
1504                    .set_ancestors(ancestors_for_round_13.clone())
1505                    .build(),
1506            ),
1507            VerifiedBlock::new_for_test(
1508                TestBlock::new(12, 3)
1509                    .set_timestamp_ms(1330)
1510                    .set_ancestors(ancestors_for_round_13.clone())
1511                    .build(),
1512            ),
1513        ];
1514
1515        // Round 14 anchor block.
1516        let ancestors_for_round_14 = round_13.iter().map(|b| b.reference()).collect();
1517        let anchor = VerifiedBlock::new_for_test(
1518            TestBlock::new(14, 1)
1519                .set_timestamp_ms(1410)
1520                .set_ancestors(ancestors_for_round_14)
1521                .build(),
1522        );
1523
1524        // Add all blocks (at and above round 11) to DagState.
1525        for b in round_11
1526            .iter()
1527            .chain(round_12.iter())
1528            .chain(round_13.iter())
1529            .chain([anchor.clone()].iter())
1530        {
1531            dag_state.accept_block(b.clone());
1532        }
1533
1534        // Check ancestors connected to anchor.
1535        let ancestors = dag_state.ancestors_at_round(&anchor, 11);
1536        let mut ancestors_refs: Vec<BlockRef> = ancestors.iter().map(|b| b.reference()).collect();
1537        ancestors_refs.sort();
1538        let mut expected_refs = vec![
1539            round_11[0].reference(),
1540            round_11[1].reference(),
1541            round_11[2].reference(),
1542            round_11[5].reference(),
1543        ];
1544        expected_refs.sort(); // we need to sort as blocks with same author and round of round 11 (position 1 & 2) might not be in right lexicographical order.
1545        assert_eq!(
1546            ancestors_refs, expected_refs,
1547            "Expected round 11 ancestors: {:?}. Got: {:?}",
1548            expected_refs, ancestors_refs
1549        );
1550    }
1551
1552    #[tokio::test]
1553    async fn test_link_causal_history() {
1554        let (mut context, _) = Context::new_for_test(4);
1555        context.parameters.dag_state_cached_rounds = 10;
1556        context.protocol_config.set_gc_depth_for_testing(3);
1557        let context = Arc::new(context);
1558
1559        let store = Arc::new(MemStore::new());
1560        let mut dag_state = DagState::new(context.clone(), store.clone());
1561
1562        // Create for rounds 1..=6. Skip creating blocks for authority 0 for rounds 4 - 6.
1563        let mut dag_builder = DagBuilder::new(context.clone());
1564        dag_builder.layers(1..=3).build();
1565        dag_builder
1566            .layers(4..=6)
1567            .authorities(vec![AuthorityIndex::new_for_test(0)])
1568            .skip_block()
1569            .build();
1570
1571        // Accept all blocks
1572        let all_blocks = dag_builder.all_blocks();
1573        dag_state.accept_blocks(all_blocks.clone());
1574
1575        // No block is linked yet.
1576        for block in &all_blocks {
1577            assert!(!dag_state.has_been_included(&block.reference()));
1578        }
1579
1580        // Link causal history from a round 1 block.
1581        let round_1_block = &all_blocks[1];
1582        assert_eq!(round_1_block.round(), 1);
1583        let linked_blocks = dag_state.link_causal_history(round_1_block.reference());
1584
1585        // Check that the block is linked.
1586        assert_eq!(linked_blocks.len(), 1);
1587        assert_eq!(linked_blocks[0], round_1_block.reference());
1588        for block_ref in linked_blocks {
1589            assert!(dag_state.has_been_included(&block_ref));
1590        }
1591
1592        // Link causal history from a round 2 block.
1593        let round_2_block = &all_blocks[4];
1594        assert_eq!(round_2_block.round(), 2);
1595        let linked_blocks = dag_state.link_causal_history(round_2_block.reference());
1596
1597        // Check the linked blocks.
1598        assert_eq!(linked_blocks.len(), 4);
1599        for block_ref in linked_blocks {
1600            assert!(block_ref == round_2_block.reference() || block_ref.round == 1);
1601        }
1602
1603        // Check linked status in dag state.
1604        for block in &all_blocks {
1605            if block.round() == 1 || block.reference() == round_2_block.reference() {
1606                assert!(dag_state.has_been_included(&block.reference()));
1607            } else {
1608                assert!(!dag_state.has_been_included(&block.reference()));
1609            }
1610        }
1611
1612        // Select round 6 block.
1613        let round_6_block = all_blocks.last().unwrap();
1614        assert_eq!(round_6_block.round(), 6);
1615
1616        // Get GC round to 3.
1617        let last_commit = TrustedCommit::new_for_test(
1618            6,
1619            CommitDigest::MIN,
1620            context.clock.timestamp_utc_ms(),
1621            round_6_block.reference(),
1622            vec![],
1623        );
1624        dag_state.set_last_commit(last_commit);
1625        assert_eq!(
1626            dag_state.gc_round(),
1627            3,
1628            "GC round should have moved to round 3"
1629        );
1630
1631        // Link causal history from a round 6 block.
1632        let linked_blocks = dag_state.link_causal_history(round_6_block.reference());
1633
1634        // Check the linked blocks. They should not include GC'ed blocks.
1635        assert_eq!(linked_blocks.len(), 7, "Linked blocks: {:?}", linked_blocks);
1636        for block_ref in linked_blocks {
1637            assert!(
1638                block_ref.round == 4
1639                    || block_ref.round == 5
1640                    || block_ref == round_6_block.reference()
1641            );
1642        }
1643
1644        // Check linked status in dag state.
1645        for block in &all_blocks {
1646            let block_ref = block.reference();
1647            if block.round() == 1
1648                || block_ref == round_2_block.reference()
1649                || block_ref.round == 4
1650                || block_ref.round == 5
1651                || block_ref == round_6_block.reference()
1652            {
1653                assert!(dag_state.has_been_included(&block.reference()));
1654            } else {
1655                assert!(!dag_state.has_been_included(&block.reference()));
1656            }
1657        }
1658    }
1659
1660    #[tokio::test]
1661    async fn test_contains_blocks_in_cache_or_store() {
1662        /// Only keep elements up to 2 rounds before the last committed round
1663        const CACHED_ROUNDS: Round = 2;
1664
1665        let (mut context, _) = Context::new_for_test(4);
1666        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1667
1668        let context = Arc::new(context);
1669        let store = Arc::new(MemStore::new());
1670        let mut dag_state = DagState::new(context.clone(), store.clone());
1671
1672        // Create test blocks for round 1 ~ 10
1673        let num_rounds: u32 = 10;
1674        let num_authorities: u32 = 4;
1675        let mut blocks = Vec::new();
1676
1677        for round in 1..=num_rounds {
1678            for author in 0..num_authorities {
1679                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1680                blocks.push(block);
1681            }
1682        }
1683
1684        // Now write in store the blocks from first 4 rounds and the rest to the dag state
1685        blocks.clone().into_iter().for_each(|block| {
1686            if block.round() <= 4 {
1687                store
1688                    .write(WriteBatch::default().blocks(vec![block]))
1689                    .unwrap();
1690            } else {
1691                dag_state.accept_blocks(vec![block]);
1692            }
1693        });
1694
1695        // Now when trying to query whether we have all the blocks, we should successfully retrieve a positive answer
1696        // where the blocks of first 4 round should be found in DagState and the rest in store.
1697        let mut block_refs = blocks
1698            .iter()
1699            .map(|block| block.reference())
1700            .collect::<Vec<_>>();
1701        let result = dag_state.contains_blocks(block_refs.clone());
1702
1703        // Ensure everything is found
1704        let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1705        assert_eq!(result, expected);
1706
1707        // Now try to ask also for one block ref that is neither in cache nor in store
1708        block_refs.insert(
1709            3,
1710            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1711        );
1712        let result = dag_state.contains_blocks(block_refs.clone());
1713
1714        // Then all should be found apart from the last one
1715        expected.insert(3, false);
1716        assert_eq!(result, expected.clone());
1717    }
1718
1719    #[tokio::test]
1720    async fn test_contains_cached_block_at_slot() {
1721        /// Only keep elements up to 2 rounds before the last committed round
1722        const CACHED_ROUNDS: Round = 2;
1723
1724        let num_authorities: u32 = 4;
1725        let (mut context, _) = Context::new_for_test(num_authorities as usize);
1726        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1727
1728        let context = Arc::new(context);
1729        let store = Arc::new(MemStore::new());
1730        let mut dag_state = DagState::new(context.clone(), store.clone());
1731
1732        // Create test blocks for round 1 ~ 10
1733        let num_rounds: u32 = 10;
1734        let mut blocks = Vec::new();
1735
1736        for round in 1..=num_rounds {
1737            for author in 0..num_authorities {
1738                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1739                blocks.push(block.clone());
1740                dag_state.accept_block(block);
1741            }
1742        }
1743
1744        // Query for genesis round 0, genesis blocks should be returned
1745        for (author, _) in context.committee.authorities() {
1746            assert!(
1747                dag_state.contains_cached_block_at_slot(Slot::new(GENESIS_ROUND, author)),
1748                "Genesis should always be found"
1749            );
1750        }
1751
1752        // Now when trying to query whether we have all the blocks, we should successfully retrieve a positive answer
1753        // where the blocks of first 4 round should be found in DagState and the rest in store.
1754        let mut block_refs = blocks
1755            .iter()
1756            .map(|block| block.reference())
1757            .collect::<Vec<_>>();
1758
1759        for block_ref in block_refs.clone() {
1760            let slot = block_ref.into();
1761            let found = dag_state.contains_cached_block_at_slot(slot);
1762            assert!(found, "A block should be found at slot {}", slot);
1763        }
1764
1765        // Now try to ask also for one block ref that is not in cache
1766        // Then all should be found apart from the last one
1767        block_refs.insert(
1768            3,
1769            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1770        );
1771        let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1772        expected.insert(3, false);
1773
1774        // Attempt to check the same for via the contains slot method
1775        for block_ref in block_refs {
1776            let slot = block_ref.into();
1777            let found = dag_state.contains_cached_block_at_slot(slot);
1778
1779            assert_eq!(expected.remove(0), found);
1780        }
1781    }
1782
1783    #[tokio::test]
1784    #[ignore]
1785    #[should_panic(
1786        expected = "Attempted to check for slot [1]3 that is <= the last gc evicted round 3"
1787    )]
1788    async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range() {
1789        /// Keep 2 rounds from the highest committed round. This is considered universal and minimum necessary blocks to hold
1790        /// for the correct node operation.
1791        const GC_DEPTH: u32 = 2;
1792        /// Keep at least 3 rounds in cache for each authority.
1793        const CACHED_ROUNDS: Round = 3;
1794
1795        let (mut context, _) = Context::new_for_test(4);
1796        context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
1797        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1798
1799        let context = Arc::new(context);
1800        let store = Arc::new(MemStore::new());
1801        let mut dag_state = DagState::new(context.clone(), store.clone());
1802
1803        // Create for rounds 1..=6. Skip creating blocks for authority 0 for rounds 4 - 6.
1804        let mut dag_builder = DagBuilder::new(context.clone());
1805        dag_builder.layers(1..=3).build();
1806        dag_builder
1807            .layers(4..=6)
1808            .authorities(vec![AuthorityIndex::new_for_test(0)])
1809            .skip_block()
1810            .build();
1811
1812        // Accept all blocks
1813        dag_builder
1814            .all_blocks()
1815            .into_iter()
1816            .for_each(|block| dag_state.accept_block(block));
1817
1818        // Now add a commit for leader round 5 to trigger an eviction
1819        dag_state.add_commit(TrustedCommit::new_for_test(
1820            1 as CommitIndex,
1821            CommitDigest::MIN,
1822            0,
1823            dag_builder.leader_block(5).unwrap().reference(),
1824            vec![],
1825        ));
1826        // Flush the DAG state to storage.
1827        dag_state.flush();
1828
1829        // Ensure that gc round has been updated
1830        assert_eq!(dag_state.gc_round(), 3, "GC round should be 3");
1831
1832        // Now what we expect to happen is for:
1833        // * Nodes 1 - 3 should have in cache blocks from gc_round (3) and onwards.
1834        // * Node 0 should have in cache blocks from it's latest round, 3, up to round 1, which is the number of cached_rounds.
1835        for authority_index in 1..=3 {
1836            for round in 4..=6 {
1837                assert!(dag_state.contains_cached_block_at_slot(Slot::new(
1838                    round,
1839                    AuthorityIndex::new_for_test(authority_index)
1840                )));
1841            }
1842        }
1843
1844        for round in 1..=3 {
1845            assert!(
1846                dag_state.contains_cached_block_at_slot(Slot::new(
1847                    round,
1848                    AuthorityIndex::new_for_test(0)
1849                ))
1850            );
1851        }
1852
1853        // When trying to request for authority 1 at block slot 3 it should panic, as anything
1854        // that is <= 3 should be evicted
1855        let _ =
1856            dag_state.contains_cached_block_at_slot(Slot::new(3, AuthorityIndex::new_for_test(1)));
1857    }
1858
1859    #[tokio::test]
1860    async fn test_get_blocks_in_cache_or_store() {
1861        let (context, _) = Context::new_for_test(4);
1862        let context = Arc::new(context);
1863        let store = Arc::new(MemStore::new());
1864        let mut dag_state = DagState::new(context.clone(), store.clone());
1865
1866        // Create test blocks for round 1 ~ 10
1867        let num_rounds: u32 = 10;
1868        let num_authorities: u32 = 4;
1869        let mut blocks = Vec::new();
1870
1871        for round in 1..=num_rounds {
1872            for author in 0..num_authorities {
1873                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1874                blocks.push(block);
1875            }
1876        }
1877
1878        // Now write in store the blocks from first 4 rounds and the rest to the dag state
1879        blocks.clone().into_iter().for_each(|block| {
1880            if block.round() <= 4 {
1881                store
1882                    .write(WriteBatch::default().blocks(vec![block]))
1883                    .unwrap();
1884            } else {
1885                dag_state.accept_blocks(vec![block]);
1886            }
1887        });
1888
1889        // Now when trying to query whether we have all the blocks, we should successfully retrieve a positive answer
1890        // where the blocks of first 4 round should be found in DagState and the rest in store.
1891        let mut block_refs = blocks
1892            .iter()
1893            .map(|block| block.reference())
1894            .collect::<Vec<_>>();
1895        let result = dag_state.get_blocks(&block_refs);
1896
1897        let mut expected = blocks
1898            .into_iter()
1899            .map(Some)
1900            .collect::<Vec<Option<VerifiedBlock>>>();
1901
1902        // Ensure everything is found
1903        assert_eq!(result, expected.clone());
1904
1905        // Now try to ask also for one block ref that is neither in cache nor in store
1906        block_refs.insert(
1907            3,
1908            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1909        );
1910        let result = dag_state.get_blocks(&block_refs);
1911
1912        // Then all should be found apart from the last one
1913        expected.insert(3, None);
1914        assert_eq!(result, expected);
1915    }
1916
1917    #[tokio::test]
1918    async fn test_flush_and_recovery() {
1919        telemetry_subscribers::init_for_testing();
1920
1921        const GC_DEPTH: u32 = 3;
1922        const CACHED_ROUNDS: u32 = 4;
1923
1924        let num_authorities: u32 = 4;
1925        let (mut context, _) = Context::new_for_test(num_authorities as usize);
1926        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1927        context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
1928
1929        let context = Arc::new(context);
1930
1931        let store = Arc::new(MemStore::new());
1932        let mut dag_state = DagState::new(context.clone(), store.clone());
1933
1934        const NUM_ROUNDS: Round = 20;
1935        let mut dag_builder = DagBuilder::new(context.clone());
1936        dag_builder.layers(1..=5).build();
1937        dag_builder
1938            .layers(6..=8)
1939            .authorities(vec![AuthorityIndex::new_for_test(0)])
1940            .skip_block()
1941            .build();
1942        dag_builder.layers(9..=NUM_ROUNDS).build();
1943
1944        // Get all commits from the DAG builder.
1945        const LAST_COMMIT_ROUND: Round = 16;
1946        const LAST_COMMIT_INDEX: CommitIndex = 15;
1947        let commits = dag_builder
1948            .get_sub_dag_and_commits(1..=NUM_ROUNDS)
1949            .into_iter()
1950            .map(|(_subdag, commit)| commit)
1951            .take(LAST_COMMIT_INDEX as usize)
1952            .collect::<Vec<_>>();
1953        assert_eq!(commits.len(), LAST_COMMIT_INDEX as usize);
1954        assert_eq!(commits.last().unwrap().round(), LAST_COMMIT_ROUND);
1955
1956        // Add the blocks from first 11 rounds and first 8 commits to the dag state
1957        // Note that the commit of round 8 is missing because where authority 0 is the leader but produced no block.
1958        // So commit 8 has leader round 9.
1959        const PERSISTED_BLOCK_ROUNDS: u32 = 12;
1960        const NUM_PERSISTED_COMMITS: usize = 8;
1961        const LAST_PERSISTED_COMMIT_ROUND: Round = 9;
1962        const LAST_PERSISTED_COMMIT_INDEX: CommitIndex = 8;
1963        dag_state.accept_blocks(dag_builder.blocks(1..=PERSISTED_BLOCK_ROUNDS));
1964        let mut finalized_commits = vec![];
1965        for commit in commits.iter().take(NUM_PERSISTED_COMMITS).cloned() {
1966            finalized_commits.push(commit.clone());
1967            dag_state.add_commit(commit);
1968        }
1969        let last_finalized_commit = finalized_commits.last().unwrap();
1970        assert_eq!(last_finalized_commit.round(), LAST_PERSISTED_COMMIT_ROUND);
1971        assert_eq!(last_finalized_commit.index(), LAST_PERSISTED_COMMIT_INDEX);
1972
1973        // Collect finalized blocks.
1974        let finalized_blocks = finalized_commits
1975            .iter()
1976            .flat_map(|commit| commit.blocks())
1977            .collect::<BTreeSet<_>>();
1978
1979        // Flush commits from the dag state
1980        dag_state.flush();
1981
1982        // Verify the store has blocks up to round 12, and commits up to index 8.
1983        let store_blocks = store
1984            .scan_blocks_by_author(AuthorityIndex::new_for_test(1), 1)
1985            .unwrap();
1986        assert_eq!(store_blocks.last().unwrap().round(), PERSISTED_BLOCK_ROUNDS);
1987        let store_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1988        assert_eq!(store_commits.len(), NUM_PERSISTED_COMMITS);
1989        assert_eq!(
1990            store_commits.last().unwrap().index(),
1991            LAST_PERSISTED_COMMIT_INDEX
1992        );
1993        assert_eq!(
1994            store_commits.last().unwrap().round(),
1995            LAST_PERSISTED_COMMIT_ROUND
1996        );
1997
1998        // Add the rest of the blocks and commits to the dag state
1999        dag_state.accept_blocks(dag_builder.blocks(PERSISTED_BLOCK_ROUNDS + 1..=NUM_ROUNDS));
2000        for commit in commits.iter().skip(NUM_PERSISTED_COMMITS).cloned() {
2001            dag_state.add_commit(commit);
2002        }
2003
2004        // All blocks should be found in DagState.
2005        let all_blocks = dag_builder.blocks(1..=NUM_ROUNDS);
2006        let block_refs = all_blocks
2007            .iter()
2008            .map(|block| block.reference())
2009            .collect::<Vec<_>>();
2010        let result = dag_state
2011            .get_blocks(&block_refs)
2012            .into_iter()
2013            .map(|b| b.unwrap())
2014            .collect::<Vec<_>>();
2015        assert_eq!(result, all_blocks);
2016
2017        // Last commit index from DagState should now be 15
2018        assert_eq!(dag_state.last_commit_index(), LAST_COMMIT_INDEX);
2019
2020        // Destroy the dag state without flushing additional data.
2021        drop(dag_state);
2022
2023        // Recover the state from the store
2024        let dag_state = DagState::new(context.clone(), store.clone());
2025
2026        // Persisted blocks rounds should be found in DagState.
2027        let all_blocks = dag_builder.blocks(1..=PERSISTED_BLOCK_ROUNDS);
2028        let block_refs = all_blocks
2029            .iter()
2030            .map(|block| block.reference())
2031            .collect::<Vec<_>>();
2032        let result = dag_state
2033            .get_blocks(&block_refs)
2034            .into_iter()
2035            .map(|b| b.unwrap())
2036            .collect::<Vec<_>>();
2037        assert_eq!(result, all_blocks);
2038
2039        // Unpersisted blocks should not be in DagState, because they are not flushed.
2040        let missing_blocks = dag_builder.blocks(PERSISTED_BLOCK_ROUNDS + 1..=NUM_ROUNDS);
2041        let block_refs = missing_blocks
2042            .iter()
2043            .map(|block| block.reference())
2044            .collect::<Vec<_>>();
2045        let retrieved_blocks = dag_state
2046            .get_blocks(&block_refs)
2047            .into_iter()
2048            .flatten()
2049            .collect::<Vec<_>>();
2050        assert!(retrieved_blocks.is_empty());
2051
2052        // Recovered last commit index and round should be 8 and 9.
2053        assert_eq!(dag_state.last_commit_index(), LAST_PERSISTED_COMMIT_INDEX);
2054        assert_eq!(dag_state.last_commit_round(), LAST_PERSISTED_COMMIT_ROUND);
2055
2056        // The last_commit_rounds of the finalized commits should have been recovered.
2057        let expected_last_committed_rounds = vec![5, 9, 8, 8];
2058        assert_eq!(
2059            dag_state.last_committed_rounds(),
2060            expected_last_committed_rounds
2061        );
2062        // Unscored subdags will be recovered based on the flushed commits and no commit info.
2063        assert_eq!(dag_state.scoring_subdags_count(), NUM_PERSISTED_COMMITS);
2064
2065        // Ensure that cached blocks exist only for specific rounds per authority
2066        for (authority_index, _) in context.committee.authorities() {
2067            let blocks = dag_state.get_cached_blocks(authority_index, 1);
2068
2069            // Ensure that eviction rounds have been properly recovered.
2070            // For every authority, the gc round is 9 - 3 = 6, and cached round is 12-5 = 7.
2071            // So eviction round is the min which is 6.
2072            if authority_index == AuthorityIndex::new_for_test(0) {
2073                assert_eq!(blocks.len(), 4);
2074                assert_eq!(dag_state.evicted_rounds[authority_index.value()], 6);
2075                assert!(
2076                    blocks
2077                        .into_iter()
2078                        .all(|block| block.round() >= 7 && block.round() <= 12)
2079                );
2080            } else {
2081                assert_eq!(blocks.len(), 6);
2082                assert_eq!(dag_state.evicted_rounds[authority_index.value()], 6);
2083                assert!(
2084                    blocks
2085                        .into_iter()
2086                        .all(|block| block.round() >= 7 && block.round() <= 12)
2087                );
2088            }
2089        }
2090
2091        // Ensure that committed blocks from > gc_round have been correctly recovered as committed according to committed sub dags.
2092        let gc_round = dag_state.gc_round();
2093        assert_eq!(gc_round, 6);
2094        dag_state
2095            .recent_blocks
2096            .iter()
2097            .for_each(|(block_ref, block_info)| {
2098                if block_ref.round > gc_round && finalized_blocks.contains(block_ref) {
2099                    assert!(
2100                        block_info.committed,
2101                        "Block {:?} should be set as committed",
2102                        block_ref
2103                    );
2104                }
2105            });
2106
2107        // Ensure the hard linked status of blocks are recovered.
2108        // All blocks below highest accepted round, or authority 0 round 12 block, should be hard linked.
2109        // Other blocks (round 12 but not from authority 0) should not be hard linked.
2110        // This is because authority 0 blocks are considered proposed blocks.
2111        dag_state
2112            .recent_blocks
2113            .iter()
2114            .for_each(|(block_ref, block_info)| {
2115                if block_ref.round < PERSISTED_BLOCK_ROUNDS || block_ref.author.value() == 0 {
2116                    assert!(block_info.included);
2117                } else {
2118                    assert!(!block_info.included);
2119                }
2120            });
2121    }
2122
2123    #[tokio::test]
2124    async fn test_block_info_as_committed() {
2125        let num_authorities: u32 = 4;
2126        let (context, _) = Context::new_for_test(num_authorities as usize);
2127        let context = Arc::new(context);
2128
2129        let store = Arc::new(MemStore::new());
2130        let mut dag_state = DagState::new(context.clone(), store.clone());
2131
2132        // Accept a block
2133        let block = VerifiedBlock::new_for_test(
2134            TestBlock::new(1, 0)
2135                .set_timestamp_ms(1000)
2136                .set_ancestors(vec![])
2137                .build(),
2138        );
2139
2140        dag_state.accept_block(block.clone());
2141
2142        // Query is committed
2143        assert!(!dag_state.is_committed(&block.reference()));
2144
2145        // Set block as committed for first time should return true
2146        assert!(
2147            dag_state.set_committed(&block.reference()),
2148            "Block should be successfully set as committed for first time"
2149        );
2150
2151        // Now it should appear as committed
2152        assert!(dag_state.is_committed(&block.reference()));
2153
2154        // Trying to set the block as committed again, it should return false.
2155        assert!(
2156            !dag_state.set_committed(&block.reference()),
2157            "Block should not be successfully set as committed"
2158        );
2159    }
2160
2161    #[tokio::test]
2162    async fn test_get_cached_blocks() {
2163        let (mut context, _) = Context::new_for_test(4);
2164        context.parameters.dag_state_cached_rounds = 5;
2165
2166        let context = Arc::new(context);
2167        let store = Arc::new(MemStore::new());
2168        let mut dag_state = DagState::new(context.clone(), store.clone());
2169
2170        // Create no blocks for authority 0
2171        // Create one block (round 10) for authority 1
2172        // Create two blocks (rounds 10,11) for authority 2
2173        // Create three blocks (rounds 10,11,12) for authority 3
2174        let mut all_blocks = Vec::new();
2175        for author in 1..=3 {
2176            for round in 10..(10 + author) {
2177                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2178                all_blocks.push(block.clone());
2179                dag_state.accept_block(block);
2180            }
2181        }
2182
2183        // Test get_cached_blocks()
2184
2185        let cached_blocks =
2186            dag_state.get_cached_blocks(context.committee.to_authority_index(0).unwrap(), 0);
2187        assert!(cached_blocks.is_empty());
2188
2189        let cached_blocks =
2190            dag_state.get_cached_blocks(context.committee.to_authority_index(1).unwrap(), 10);
2191        assert_eq!(cached_blocks.len(), 1);
2192        assert_eq!(cached_blocks[0].round(), 10);
2193
2194        let cached_blocks =
2195            dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 10);
2196        assert_eq!(cached_blocks.len(), 2);
2197        assert_eq!(cached_blocks[0].round(), 10);
2198        assert_eq!(cached_blocks[1].round(), 11);
2199
2200        let cached_blocks =
2201            dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 11);
2202        assert_eq!(cached_blocks.len(), 1);
2203        assert_eq!(cached_blocks[0].round(), 11);
2204
2205        let cached_blocks =
2206            dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 10);
2207        assert_eq!(cached_blocks.len(), 3);
2208        assert_eq!(cached_blocks[0].round(), 10);
2209        assert_eq!(cached_blocks[1].round(), 11);
2210        assert_eq!(cached_blocks[2].round(), 12);
2211
2212        let cached_blocks =
2213            dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 12);
2214        assert_eq!(cached_blocks.len(), 1);
2215        assert_eq!(cached_blocks[0].round(), 12);
2216
2217        // Test get_cached_blocks_in_range()
2218
2219        // Start == end
2220        let cached_blocks = dag_state.get_cached_blocks_in_range(
2221            context.committee.to_authority_index(3).unwrap(),
2222            10,
2223            10,
2224            1,
2225        );
2226        assert!(cached_blocks.is_empty());
2227
2228        // Start > end
2229        let cached_blocks = dag_state.get_cached_blocks_in_range(
2230            context.committee.to_authority_index(3).unwrap(),
2231            11,
2232            10,
2233            1,
2234        );
2235        assert!(cached_blocks.is_empty());
2236
2237        // Empty result.
2238        let cached_blocks = dag_state.get_cached_blocks_in_range(
2239            context.committee.to_authority_index(0).unwrap(),
2240            9,
2241            10,
2242            1,
2243        );
2244        assert!(cached_blocks.is_empty());
2245
2246        // Single block, one round before the end.
2247        let cached_blocks = dag_state.get_cached_blocks_in_range(
2248            context.committee.to_authority_index(1).unwrap(),
2249            9,
2250            11,
2251            1,
2252        );
2253        assert_eq!(cached_blocks.len(), 1);
2254        assert_eq!(cached_blocks[0].round(), 10);
2255
2256        // Respect end round.
2257        let cached_blocks = dag_state.get_cached_blocks_in_range(
2258            context.committee.to_authority_index(2).unwrap(),
2259            9,
2260            12,
2261            5,
2262        );
2263        assert_eq!(cached_blocks.len(), 2);
2264        assert_eq!(cached_blocks[0].round(), 10);
2265        assert_eq!(cached_blocks[1].round(), 11);
2266
2267        // Respect start round.
2268        let cached_blocks = dag_state.get_cached_blocks_in_range(
2269            context.committee.to_authority_index(3).unwrap(),
2270            11,
2271            20,
2272            5,
2273        );
2274        assert_eq!(cached_blocks.len(), 2);
2275        assert_eq!(cached_blocks[0].round(), 11);
2276        assert_eq!(cached_blocks[1].round(), 12);
2277
2278        // Respect limit
2279        let cached_blocks = dag_state.get_cached_blocks_in_range(
2280            context.committee.to_authority_index(3).unwrap(),
2281            10,
2282            20,
2283            1,
2284        );
2285        assert_eq!(cached_blocks.len(), 1);
2286        assert_eq!(cached_blocks[0].round(), 10);
2287    }
2288
2289    #[tokio::test]
2290    async fn test_get_last_cached_block() {
2291        // GIVEN
2292        const CACHED_ROUNDS: Round = 2;
2293        const GC_DEPTH: u32 = 1;
2294        let (mut context, _) = Context::new_for_test(4);
2295        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2296        context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
2297
2298        let context = Arc::new(context);
2299        let store = Arc::new(MemStore::new());
2300        let mut dag_state = DagState::new(context.clone(), store.clone());
2301
2302        // Create no blocks for authority 0
2303        // Create one block (round 1) for authority 1
2304        // Create two blocks (rounds 1,2) for authority 2
2305        // Create three blocks (rounds 1,2,3) for authority 3
2306        let dag_str = "DAG {
2307            Round 0 : { 4 },
2308            Round 1 : {
2309                B -> [*],
2310                C -> [*],
2311                D -> [*],
2312            },
2313            Round 2 : {
2314                C -> [*],
2315                D -> [*],
2316            },
2317            Round 3 : {
2318                D -> [*],
2319            },
2320        }";
2321
2322        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2323
2324        // Add equivocating block for round 2 authority 3
2325        let block = VerifiedBlock::new_for_test(TestBlock::new(2, 2).build());
2326
2327        // Accept all blocks
2328        for block in dag_builder
2329            .all_blocks()
2330            .into_iter()
2331            .chain(std::iter::once(block))
2332        {
2333            dag_state.accept_block(block);
2334        }
2335
2336        dag_state.add_commit(TrustedCommit::new_for_test(
2337            1 as CommitIndex,
2338            CommitDigest::MIN,
2339            context.clock.timestamp_utc_ms(),
2340            dag_builder.leader_block(3).unwrap().reference(),
2341            vec![],
2342        ));
2343
2344        // WHEN search for the latest blocks
2345        let end_round = 4;
2346        let expected_rounds = vec![0, 1, 2, 3];
2347        let expected_excluded_and_equivocating_blocks = vec![0, 0, 1, 0];
2348        // THEN
2349        let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2350        assert_eq!(
2351            last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2352            expected_rounds
2353        );
2354        assert_eq!(
2355            last_blocks.iter().map(|b| b.1.len()).collect::<Vec<_>>(),
2356            expected_excluded_and_equivocating_blocks
2357        );
2358
2359        // THEN
2360        for (i, expected_round) in expected_rounds.iter().enumerate() {
2361            let round = dag_state
2362                .get_last_cached_block_in_range(
2363                    context.committee.to_authority_index(i).unwrap(),
2364                    0,
2365                    end_round,
2366                )
2367                .map(|b| b.round())
2368                .unwrap_or_default();
2369            assert_eq!(round, *expected_round, "Authority {i}");
2370        }
2371
2372        // WHEN starting from round 2
2373        let start_round = 2;
2374        let expected_rounds = [0, 0, 2, 3];
2375
2376        // THEN
2377        for (i, expected_round) in expected_rounds.iter().enumerate() {
2378            let round = dag_state
2379                .get_last_cached_block_in_range(
2380                    context.committee.to_authority_index(i).unwrap(),
2381                    start_round,
2382                    end_round,
2383                )
2384                .map(|b| b.round())
2385                .unwrap_or_default();
2386            assert_eq!(round, *expected_round, "Authority {i}");
2387        }
2388
2389        // WHEN we flush the DagState - after adding a commit with all the blocks, we expect this to trigger
2390        // a clean up in the internal cache. That will keep the all the blocks with rounds >= authority_commit_round - CACHED_ROUND.
2391        //
2392        // When GC is enabled then we'll keep all the blocks that are > gc_round (2) and for those who don't have blocks > gc_round, we'll keep
2393        // all their highest round blocks for CACHED_ROUNDS.
2394        dag_state.flush();
2395
2396        // AND we request before round 3
2397        let end_round = 3;
2398        let expected_rounds = vec![0, 1, 2, 2];
2399
2400        // THEN
2401        let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2402        assert_eq!(
2403            last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2404            expected_rounds
2405        );
2406
2407        // THEN
2408        for (i, expected_round) in expected_rounds.iter().enumerate() {
2409            let round = dag_state
2410                .get_last_cached_block_in_range(
2411                    context.committee.to_authority_index(i).unwrap(),
2412                    0,
2413                    end_round,
2414                )
2415                .map(|b| b.round())
2416                .unwrap_or_default();
2417            assert_eq!(round, *expected_round, "Authority {i}");
2418        }
2419    }
2420
2421    #[tokio::test]
2422    #[should_panic(
2423        expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
2424    )]
2425    async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() {
2426        // GIVEN
2427        const CACHED_ROUNDS: Round = 1;
2428        const GC_DEPTH: u32 = 1;
2429        let (mut context, _) = Context::new_for_test(4);
2430        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2431        context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
2432
2433        let context = Arc::new(context);
2434        let store = Arc::new(MemStore::new());
2435        let mut dag_state = DagState::new(context.clone(), store.clone());
2436
2437        // Create no blocks for authority 0
2438        // Create one block (round 1) for authority 1
2439        // Create two blocks (rounds 1,2) for authority 2
2440        // Create three blocks (rounds 1,2,3) for authority 3
2441        let mut dag_builder = DagBuilder::new(context.clone());
2442        dag_builder
2443            .layers(1..=1)
2444            .authorities(vec![AuthorityIndex::new_for_test(0)])
2445            .skip_block()
2446            .build();
2447        dag_builder
2448            .layers(2..=2)
2449            .authorities(vec![
2450                AuthorityIndex::new_for_test(0),
2451                AuthorityIndex::new_for_test(1),
2452            ])
2453            .skip_block()
2454            .build();
2455        dag_builder
2456            .layers(3..=3)
2457            .authorities(vec![
2458                AuthorityIndex::new_for_test(0),
2459                AuthorityIndex::new_for_test(1),
2460                AuthorityIndex::new_for_test(2),
2461            ])
2462            .skip_block()
2463            .build();
2464
2465        // Accept all blocks
2466        for block in dag_builder.all_blocks() {
2467            dag_state.accept_block(block);
2468        }
2469
2470        dag_state.add_commit(TrustedCommit::new_for_test(
2471            1 as CommitIndex,
2472            CommitDigest::MIN,
2473            0,
2474            dag_builder.leader_block(3).unwrap().reference(),
2475            vec![],
2476        ));
2477
2478        // Flush the store so we update the evict rounds
2479        dag_state.flush();
2480
2481        // THEN the method should panic, as some authorities have already evicted rounds <= round 2
2482        dag_state.get_last_cached_block_per_authority(2);
2483    }
2484
2485    #[tokio::test]
2486    async fn test_last_quorum() {
2487        // GIVEN
2488        let (context, _) = Context::new_for_test(4);
2489        let context = Arc::new(context);
2490        let store = Arc::new(MemStore::new());
2491        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2492
2493        // WHEN no blocks exist then genesis should be returned
2494        {
2495            let genesis = genesis_blocks(context.as_ref());
2496
2497            assert_eq!(dag_state.read().last_quorum(), genesis);
2498        }
2499
2500        // WHEN a fully connected DAG up to round 4 is created, then round 4 blocks should be returned as quorum
2501        {
2502            let mut dag_builder = DagBuilder::new(context.clone());
2503            dag_builder
2504                .layers(1..=4)
2505                .build()
2506                .persist_layers(dag_state.clone());
2507            let round_4_blocks: Vec<_> = dag_builder
2508                .blocks(4..=4)
2509                .into_iter()
2510                .map(|block| block.reference())
2511                .collect();
2512
2513            let last_quorum = dag_state.read().last_quorum();
2514
2515            assert_eq!(
2516                last_quorum
2517                    .into_iter()
2518                    .map(|block| block.reference())
2519                    .collect::<Vec<_>>(),
2520                round_4_blocks
2521            );
2522        }
2523
2524        // WHEN adding one more block at round 5, still round 4 should be returned as quorum
2525        {
2526            let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2527            dag_state.write().accept_block(block);
2528
2529            let round_4_blocks = dag_state.read().get_uncommitted_blocks_at_round(4);
2530
2531            let last_quorum = dag_state.read().last_quorum();
2532
2533            assert_eq!(last_quorum, round_4_blocks);
2534        }
2535    }
2536
2537    #[tokio::test]
2538    async fn test_last_block_for_authority() {
2539        // GIVEN
2540        let (context, _) = Context::new_for_test(4);
2541        let context = Arc::new(context);
2542        let store = Arc::new(MemStore::new());
2543        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2544
2545        // WHEN no blocks exist then genesis should be returned
2546        {
2547            let genesis = genesis_blocks(context.as_ref());
2548            let my_genesis = genesis
2549                .into_iter()
2550                .find(|block| block.author() == context.own_index)
2551                .unwrap();
2552
2553            assert_eq!(dag_state.read().get_last_proposed_block(), my_genesis);
2554        }
2555
2556        // WHEN adding some blocks for authorities, only the last ones should be returned
2557        {
2558            // add blocks up to round 4
2559            let mut dag_builder = DagBuilder::new(context.clone());
2560            dag_builder
2561                .layers(1..=4)
2562                .build()
2563                .persist_layers(dag_state.clone());
2564
2565            // add block 5 for authority 0
2566            let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2567            dag_state.write().accept_block(block);
2568
2569            let block = dag_state
2570                .read()
2571                .get_last_block_for_authority(AuthorityIndex::new_for_test(0));
2572            assert_eq!(block.round(), 5);
2573
2574            for (authority_index, _) in context.committee.authorities() {
2575                let block = dag_state
2576                    .read()
2577                    .get_last_block_for_authority(authority_index);
2578
2579                if authority_index.value() == 0 {
2580                    assert_eq!(block.round(), 5);
2581                } else {
2582                    assert_eq!(block.round(), 4);
2583                }
2584            }
2585        }
2586    }
2587
2588    #[tokio::test]
2589    async fn test_accept_block_not_panics_when_timestamp_is_ahead_and_median_timestamp() {
2590        // GIVEN
2591        let (context, _) = Context::new_for_test(4);
2592        let context = Arc::new(context);
2593        let store = Arc::new(MemStore::new());
2594        let mut dag_state = DagState::new(context.clone(), store.clone());
2595
2596        // Set a timestamp for the block that is ahead of the current time
2597        let block_timestamp = context.clock.timestamp_utc_ms() + 5_000;
2598
2599        let block = VerifiedBlock::new_for_test(
2600            TestBlock::new(10, 0)
2601                .set_timestamp_ms(block_timestamp)
2602                .build(),
2603        );
2604
2605        // Try to accept the block - it should not panic
2606        dag_state.accept_block(block);
2607    }
2608
2609    #[tokio::test]
2610    async fn test_last_finalized_commit() {
2611        // GIVEN
2612        let (context, _) = Context::new_for_test(4);
2613        let context = Arc::new(context);
2614        let store = Arc::new(MemStore::new());
2615        let mut dag_state = DagState::new(context.clone(), store.clone());
2616
2617        // WHEN adding a finalized commit
2618        let commit_ref = CommitRef::new(1, CommitDigest::MIN);
2619        let rejected_transactions = BTreeMap::new();
2620        dag_state.add_finalized_commit(commit_ref, rejected_transactions.clone());
2621
2622        // THEN the commit should be added to the buffer
2623        assert_eq!(dag_state.finalized_commits_to_write.len(), 1);
2624        assert_eq!(
2625            dag_state.finalized_commits_to_write[0],
2626            (commit_ref, rejected_transactions.clone())
2627        );
2628
2629        // WHEN flushing the DAG state
2630        dag_state.flush();
2631
2632        // THEN the commit and rejected transactions should be written to storage
2633        let last_finalized_commit = store.read_last_finalized_commit().unwrap();
2634        assert_eq!(last_finalized_commit, Some(commit_ref));
2635        let stored_rejected_transactions = store
2636            .read_rejected_transactions(commit_ref)
2637            .unwrap()
2638            .unwrap();
2639        assert_eq!(stored_rejected_transactions, rejected_transactions);
2640    }
2641}