consensus_core/
dag_state.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    cmp::max,
6    collections::{BTreeMap, BTreeSet, VecDeque},
7    ops::Bound::{Excluded, Included, Unbounded},
8    panic,
9    sync::Arc,
10    time::Duration,
11    vec,
12};
13
14use consensus_config::AuthorityIndex;
15use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs, Round, TransactionIndex};
16use itertools::Itertools as _;
17use tokio::time::Instant;
18use tracing::{debug, error, info, trace};
19
20use crate::{
21    CommittedSubDag,
22    block::{BlockAPI, GENESIS_ROUND, Slot, VerifiedBlock, genesis_blocks},
23    commit::{
24        CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRef, CommitVote,
25        GENESIS_COMMIT_INDEX, TrustedCommit, load_committed_subdag_from_store,
26    },
27    context::Context,
28    leader_scoring::{ReputationScores, ScoringSubdag},
29    storage::{Store, WriteBatch},
30    threshold_clock::ThresholdClock,
31};
32
33/// DagState provides the API to write and read accepted blocks from the DAG.
34/// Only uncommitted and last committed blocks are cached in memory.
35/// The rest of blocks are stored on disk.
36/// Refs to cached blocks and additional refs are cached as well, to speed up existence checks.
37///
38/// Note: DagState should be wrapped with Arc<parking_lot::RwLock<_>>, to allow
39/// concurrent access from multiple components.
40pub struct DagState {
41    context: Arc<Context>,
42
43    // The genesis blocks
44    genesis: BTreeMap<BlockRef, VerifiedBlock>,
45
46    // Contains recent blocks within CACHED_ROUNDS from the last committed round per authority.
47    // Note: all uncommitted blocks are kept in memory.
48    //
49    // When GC is enabled, this map has a different semantic. It holds all the recent data for each authority making sure that it always have available
50    // CACHED_ROUNDS worth of data. The entries are evicted based on the latest GC round, however the eviction process will respect the CACHED_ROUNDS.
51    // For each authority, blocks are only evicted when their round is less than or equal to both `gc_round`, and `highest authority round - cached rounds`.
52    // This ensures that the GC requirements are respected (we never clean up any block above `gc_round`), and there are enough blocks cached.
53    recent_blocks: BTreeMap<BlockRef, BlockInfo>,
54
55    // Indexes recent block refs by their authorities.
56    // Vec position corresponds to the authority index.
57    recent_refs_by_authority: Vec<BTreeSet<BlockRef>>,
58
59    // Keeps track of the threshold clock for proposing blocks.
60    threshold_clock: ThresholdClock,
61
62    // Keeps track of the highest round that has been evicted for each authority. Any blocks that are of round <= evict_round
63    // should be considered evicted, and if any exist we should not consider the causauly complete in the order they appear.
64    // The `evicted_rounds` size should be the same as the committee size.
65    evicted_rounds: Vec<Round>,
66
67    // Highest round of blocks accepted.
68    highest_accepted_round: Round,
69
70    // Last consensus commit of the dag.
71    last_commit: Option<TrustedCommit>,
72
73    // Last wall time when commit round advanced. Does not persist across restarts.
74    last_commit_round_advancement_time: Option<std::time::Instant>,
75
76    // Last committed rounds per authority.
77    last_committed_rounds: Vec<Round>,
78
79    /// The committed subdags that have been scored but scores have not been used
80    /// for leader schedule yet.
81    scoring_subdag: ScoringSubdag,
82
83    // Commit votes pending to be included in new blocks.
84    // TODO: limit to 1st commit per round with multi-leader.
85    // TODO: recover unproposed pending commit votes at startup.
86    pending_commit_votes: VecDeque<CommitVote>,
87
88    // Blocks and commits must be buffered for persistence before they can be
89    // inserted into the local DAG or sent to output.
90    blocks_to_write: Vec<VerifiedBlock>,
91    commits_to_write: Vec<TrustedCommit>,
92
93    // Buffers the reputation scores & last_committed_rounds to be flushed with the
94    // next dag state flush. Not writing eagerly is okay because we can recover reputation scores
95    // & last_committed_rounds from the commits as needed.
96    commit_info_to_write: Vec<(CommitRef, CommitInfo)>,
97
98    // Buffers finalized commits and their rejected transactions to be written to storage.
99    finalized_commits_to_write: Vec<(CommitRef, BTreeMap<BlockRef, Vec<TransactionIndex>>)>,
100
101    // Persistent storage for blocks, commits and other consensus data.
102    store: Arc<dyn Store>,
103
104    // The number of cached rounds
105    cached_rounds: Round,
106}
107
108impl DagState {
109    /// Initializes DagState from storage.
110    pub fn new(context: Arc<Context>, store: Arc<dyn Store>) -> Self {
111        let cached_rounds = context.parameters.dag_state_cached_rounds as Round;
112        let num_authorities = context.committee.size();
113
114        let genesis = genesis_blocks(context.as_ref())
115            .into_iter()
116            .map(|block| (block.reference(), block))
117            .collect();
118
119        let threshold_clock = ThresholdClock::new(1, context.clone());
120
121        let last_commit = store
122            .read_last_commit()
123            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
124
125        let commit_info = store
126            .read_last_commit_info()
127            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
128        let (mut last_committed_rounds, commit_recovery_start_index) =
129            if let Some((commit_ref, commit_info)) = commit_info {
130                tracing::info!("Recovering committed state from {commit_ref} {commit_info:?}");
131                (commit_info.committed_rounds, commit_ref.index + 1)
132            } else {
133                tracing::info!("Found no stored CommitInfo to recover from");
134                (vec![0; num_authorities], GENESIS_COMMIT_INDEX + 1)
135            };
136
137        let mut unscored_committed_subdags = Vec::new();
138        let mut scoring_subdag = ScoringSubdag::new(context.clone());
139
140        if let Some(last_commit) = last_commit.as_ref() {
141            store
142                .scan_commits((commit_recovery_start_index..=last_commit.index()).into())
143                .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
144                .iter()
145                .for_each(|commit| {
146                    for block_ref in commit.blocks() {
147                        last_committed_rounds[block_ref.author] =
148                            max(last_committed_rounds[block_ref.author], block_ref.round);
149                    }
150                    let committed_subdag = load_committed_subdag_from_store(
151                        &context,
152                        store.as_ref(),
153                        commit.clone(),
154                        vec![],
155                    );
156                    // We don't need to recover reputation scores for unscored_committed_subdags
157                    unscored_committed_subdags.push(committed_subdag);
158                });
159        }
160
161        tracing::info!(
162            "DagState was initialized with the following state: \
163            {last_commit:?}; {last_committed_rounds:?}; {} unscored committed subdags;",
164            unscored_committed_subdags.len()
165        );
166
167        scoring_subdag.add_subdags(std::mem::take(&mut unscored_committed_subdags));
168
169        let mut state = Self {
170            context: context.clone(),
171            genesis,
172            recent_blocks: BTreeMap::new(),
173            recent_refs_by_authority: vec![BTreeSet::new(); num_authorities],
174            threshold_clock,
175            highest_accepted_round: 0,
176            last_commit: last_commit.clone(),
177            last_commit_round_advancement_time: None,
178            last_committed_rounds: last_committed_rounds.clone(),
179            pending_commit_votes: VecDeque::new(),
180            blocks_to_write: vec![],
181            commits_to_write: vec![],
182            commit_info_to_write: vec![],
183            finalized_commits_to_write: vec![],
184            scoring_subdag,
185            store: store.clone(),
186            cached_rounds,
187            evicted_rounds: vec![0; num_authorities],
188        };
189
190        for (authority_index, _) in context.committee.authorities() {
191            let (blocks, eviction_round) = {
192                // Find the latest block for the authority to calculate the eviction round. Then we want to scan and load the blocks from the eviction round and onwards only.
193                // As reminder, the eviction round is taking into account the gc_round.
194                let last_block = state
195                    .store
196                    .scan_last_blocks_by_author(authority_index, 1, None)
197                    .expect("Database error");
198                let last_block_round = last_block
199                    .last()
200                    .map(|b| b.round())
201                    .unwrap_or(GENESIS_ROUND);
202
203                let eviction_round =
204                    Self::eviction_round(last_block_round, state.gc_round(), state.cached_rounds);
205                let blocks = state
206                    .store
207                    .scan_blocks_by_author(authority_index, eviction_round + 1)
208                    .expect("Database error");
209
210                (blocks, eviction_round)
211            };
212
213            state.evicted_rounds[authority_index] = eviction_round;
214
215            // Update the block metadata for the authority.
216            for block in &blocks {
217                state.update_block_metadata(block);
218            }
219
220            debug!(
221                "Recovered blocks {}: {:?}",
222                authority_index,
223                blocks
224                    .iter()
225                    .map(|b| b.reference())
226                    .collect::<Vec<BlockRef>>()
227            );
228        }
229
230        if let Some(last_commit) = last_commit {
231            let mut index = last_commit.index();
232            let gc_round = state.gc_round();
233            info!(
234                "Recovering block commit statuses from commit index {} and backwards until leader of round <= gc_round {:?}",
235                index, gc_round
236            );
237
238            loop {
239                let commits = store
240                    .scan_commits((index..=index).into())
241                    .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
242                let Some(commit) = commits.first() else {
243                    info!("Recovering finished up to index {index}, no more commits to recover");
244                    break;
245                };
246
247                // Check the commit leader round to see if it is within the gc_round. If it is not then we can stop the recovery process.
248                if gc_round > 0 && commit.leader().round <= gc_round {
249                    info!(
250                        "Recovering finished, reached commit leader round {} <= gc_round {}",
251                        commit.leader().round,
252                        gc_round
253                    );
254                    break;
255                }
256
257                commit.blocks().iter().filter(|b| b.round > gc_round).for_each(|block_ref|{
258                    debug!(
259                        "Setting block {:?} as committed based on commit {:?}",
260                        block_ref,
261                        commit.index()
262                    );
263                    assert!(state.set_committed(block_ref), "Attempted to set again a block {:?} as committed when recovering commit {:?}", block_ref, commit);
264                });
265
266                // All commits are indexed starting from 1, so one reach zero exit.
267                index = index.saturating_sub(1);
268                if index == 0 {
269                    break;
270                }
271            }
272        }
273
274        // Recover hard linked statuses for blocks within GC round.
275        let proposed_blocks = store
276            .scan_blocks_by_author(context.own_index, state.gc_round() + 1)
277            .expect("Database error");
278        for block in proposed_blocks {
279            state.link_causal_history(block.reference());
280        }
281
282        state
283    }
284
285    /// Accepts a block into DagState and keeps it in memory.
286    pub(crate) fn accept_block(&mut self, block: VerifiedBlock) {
287        assert_ne!(
288            block.round(),
289            0,
290            "Genesis block should not be accepted into DAG."
291        );
292
293        let block_ref = block.reference();
294        if self.contains_block(&block_ref) {
295            return;
296        }
297
298        let now = self.context.clock.timestamp_utc_ms();
299        if block.timestamp_ms() > now {
300            trace!(
301                "Block {:?} with timestamp {} is greater than local timestamp {}.",
302                block,
303                block.timestamp_ms(),
304                now,
305            );
306        }
307        let hostname = &self.context.committee.authority(block_ref.author).hostname;
308        self.context
309            .metrics
310            .node_metrics
311            .accepted_block_time_drift_ms
312            .with_label_values(&[hostname])
313            .inc_by(block.timestamp_ms().saturating_sub(now));
314
315        // TODO: Move this check to core
316        // Ensure we don't write multiple blocks per slot for our own index
317        if block_ref.author == self.context.own_index {
318            let existing_blocks = self.get_uncommitted_blocks_at_slot(block_ref.into());
319            if !self
320                .context
321                .parameters
322                .internal
323                .skip_equivocation_validation
324            {
325                assert!(
326                    existing_blocks.is_empty(),
327                    "Block Rejected! Attempted to add block {block:#?} to own slot where \
328                    block(s) {existing_blocks:#?} already exists."
329                );
330            }
331        }
332        self.update_block_metadata(&block);
333        self.blocks_to_write.push(block);
334        let source = if self.context.own_index == block_ref.author {
335            "own"
336        } else {
337            "others"
338        };
339        self.context
340            .metrics
341            .node_metrics
342            .accepted_blocks
343            .with_label_values(&[source])
344            .inc();
345    }
346
347    /// Updates internal metadata for a block.
348    fn update_block_metadata(&mut self, block: &VerifiedBlock) {
349        let block_ref = block.reference();
350        self.recent_blocks
351            .insert(block_ref, BlockInfo::new(block.clone()));
352        self.recent_refs_by_authority[block_ref.author].insert(block_ref);
353
354        if self.threshold_clock.add_block(block_ref) {
355            // Do not measure quorum delay when no local block is proposed in the round.
356            let last_proposed_block = self.get_last_proposed_block();
357            if last_proposed_block.round() == block_ref.round {
358                let quorum_delay_ms = self
359                    .context
360                    .clock
361                    .timestamp_utc_ms()
362                    .saturating_sub(self.get_last_proposed_block().timestamp_ms());
363                self.context
364                    .metrics
365                    .node_metrics
366                    .quorum_receive_latency
367                    .observe(Duration::from_millis(quorum_delay_ms).as_secs_f64());
368            }
369        }
370
371        self.highest_accepted_round = max(self.highest_accepted_round, block.round());
372        self.context
373            .metrics
374            .node_metrics
375            .highest_accepted_round
376            .set(self.highest_accepted_round as i64);
377
378        let highest_accepted_round_for_author = self.recent_refs_by_authority[block_ref.author]
379            .last()
380            .map(|block_ref| block_ref.round)
381            .expect("There should be by now at least one block ref");
382        let hostname = &self.context.committee.authority(block_ref.author).hostname;
383        self.context
384            .metrics
385            .node_metrics
386            .highest_accepted_authority_round
387            .with_label_values(&[hostname])
388            .set(highest_accepted_round_for_author as i64);
389    }
390
391    /// Accepts a blocks into DagState and keeps it in memory.
392    pub(crate) fn accept_blocks(&mut self, blocks: Vec<VerifiedBlock>) {
393        debug!(
394            "Accepting blocks: {}",
395            blocks.iter().map(|b| b.reference().to_string()).join(",")
396        );
397        for block in blocks {
398            self.accept_block(block);
399        }
400    }
401
402    /// Gets a block by checking cached recent blocks then storage.
403    /// Returns None when the block is not found.
404    pub(crate) fn get_block(&self, reference: &BlockRef) -> Option<VerifiedBlock> {
405        self.get_blocks(&[*reference])
406            .pop()
407            .expect("Exactly one element should be returned")
408    }
409
410    /// Gets blocks by checking genesis, cached recent blocks in memory, then storage.
411    /// An element is None when the corresponding block is not found.
412    pub(crate) fn get_blocks(&self, block_refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
413        let mut blocks = vec![None; block_refs.len()];
414        let mut missing = Vec::new();
415
416        for (index, block_ref) in block_refs.iter().enumerate() {
417            if block_ref.round == GENESIS_ROUND {
418                // Allow the caller to handle the invalid genesis ancestor error.
419                if let Some(block) = self.genesis.get(block_ref) {
420                    blocks[index] = Some(block.clone());
421                }
422                continue;
423            }
424            if let Some(block_info) = self.recent_blocks.get(block_ref) {
425                blocks[index] = Some(block_info.block.clone());
426                continue;
427            }
428            missing.push((index, block_ref));
429        }
430
431        if missing.is_empty() {
432            return blocks;
433        }
434
435        let missing_refs = missing
436            .iter()
437            .map(|(_, block_ref)| **block_ref)
438            .collect::<Vec<_>>();
439        let store_results = self
440            .store
441            .read_blocks(&missing_refs)
442            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
443        self.context
444            .metrics
445            .node_metrics
446            .dag_state_store_read_count
447            .with_label_values(&["get_blocks"])
448            .inc();
449
450        for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
451            blocks[index] = result;
452        }
453
454        blocks
455    }
456
457    /// Gets all uncommitted blocks in a slot.
458    /// Uncommitted blocks must exist in memory, so only in-memory blocks are checked.
459    pub(crate) fn get_uncommitted_blocks_at_slot(&self, slot: Slot) -> Vec<VerifiedBlock> {
460        // TODO: either panic below when the slot is at or below the last committed round,
461        // or support reading from storage while limiting storage reads to edge cases.
462
463        let mut blocks = vec![];
464        for (_block_ref, block_info) in self.recent_blocks.range((
465            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
466            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
467        )) {
468            blocks.push(block_info.block.clone())
469        }
470        blocks
471    }
472
473    /// Gets all uncommitted blocks in a round.
474    /// Uncommitted blocks must exist in memory, so only in-memory blocks are checked.
475    pub(crate) fn get_uncommitted_blocks_at_round(&self, round: Round) -> Vec<VerifiedBlock> {
476        if round <= self.last_commit_round() {
477            panic!("Round {} have committed blocks!", round);
478        }
479
480        let mut blocks = vec![];
481        for (_block_ref, block_info) in self.recent_blocks.range((
482            Included(BlockRef::new(round, AuthorityIndex::ZERO, BlockDigest::MIN)),
483            Excluded(BlockRef::new(
484                round + 1,
485                AuthorityIndex::ZERO,
486                BlockDigest::MIN,
487            )),
488        )) {
489            blocks.push(block_info.block.clone())
490        }
491        blocks
492    }
493
494    /// Gets all ancestors in the history of a block at a certain round.
495    pub(crate) fn ancestors_at_round(
496        &self,
497        later_block: &VerifiedBlock,
498        earlier_round: Round,
499    ) -> Vec<VerifiedBlock> {
500        // Iterate through ancestors of later_block in round descending order.
501        let mut linked: BTreeSet<BlockRef> = later_block.ancestors().iter().cloned().collect();
502        while !linked.is_empty() {
503            let round = linked.last().unwrap().round;
504            // Stop after finishing traversal for ancestors above earlier_round.
505            if round <= earlier_round {
506                break;
507            }
508            let block_ref = linked.pop_last().unwrap();
509            let Some(block) = self.get_block(&block_ref) else {
510                panic!("Block {:?} should exist in DAG!", block_ref);
511            };
512            linked.extend(block.ancestors().iter().cloned());
513        }
514        linked
515            .range((
516                Included(BlockRef::new(
517                    earlier_round,
518                    AuthorityIndex::ZERO,
519                    BlockDigest::MIN,
520                )),
521                Unbounded,
522            ))
523            .map(|r| {
524                self.get_block(r)
525                    .unwrap_or_else(|| panic!("Block {:?} should exist in DAG!", r))
526                    .clone()
527            })
528            .collect()
529    }
530
531    /// Gets the last proposed block from this authority.
532    /// If no block is proposed yet, returns the genesis block.
533    pub(crate) fn get_last_proposed_block(&self) -> VerifiedBlock {
534        self.get_last_block_for_authority(self.context.own_index)
535    }
536
537    /// Retrieves the last accepted block from the specified `authority`. If no block is found in cache
538    /// then the genesis block is returned as no other block has been received from that authority.
539    pub(crate) fn get_last_block_for_authority(&self, authority: AuthorityIndex) -> VerifiedBlock {
540        if let Some(last) = self.recent_refs_by_authority[authority].last() {
541            return self
542                .recent_blocks
543                .get(last)
544                .expect("Block should be found in recent blocks")
545                .block
546                .clone();
547        }
548
549        // if none exists, then fallback to genesis
550        let (_, genesis_block) = self
551            .genesis
552            .iter()
553            .find(|(block_ref, _)| block_ref.author == authority)
554            .expect("Genesis should be found for authority {authority_index}");
555        genesis_block.clone()
556    }
557
558    /// Returns cached recent blocks from the specified authority.
559    /// Blocks returned are limited to round >= `start`, and cached.
560    /// NOTE: caller should not assume returned blocks are always chained.
561    /// "Disconnected" blocks can be returned when there are byzantine blocks,
562    /// or a previously evicted block is accepted again.
563    pub(crate) fn get_cached_blocks(
564        &self,
565        authority: AuthorityIndex,
566        start: Round,
567    ) -> Vec<VerifiedBlock> {
568        self.get_cached_blocks_in_range(authority, start, Round::MAX, usize::MAX)
569    }
570
571    // Retrieves the cached block within the range [start_round, end_round) from a given authority,
572    // limited in total number of blocks.
573    pub(crate) fn get_cached_blocks_in_range(
574        &self,
575        authority: AuthorityIndex,
576        start_round: Round,
577        end_round: Round,
578        limit: usize,
579    ) -> Vec<VerifiedBlock> {
580        if start_round >= end_round || limit == 0 {
581            return vec![];
582        }
583
584        let mut blocks = vec![];
585        for block_ref in self.recent_refs_by_authority[authority].range((
586            Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
587            Excluded(BlockRef::new(
588                end_round,
589                AuthorityIndex::MIN,
590                BlockDigest::MIN,
591            )),
592        )) {
593            let block_info = self
594                .recent_blocks
595                .get(block_ref)
596                .expect("Block should exist in recent blocks");
597            blocks.push(block_info.block.clone());
598            if blocks.len() >= limit {
599                break;
600            }
601        }
602        blocks
603    }
604
605    // Retrieves the last cached block within the range [start_round, end_round) from a given authority.
606    pub(crate) fn get_last_cached_block_in_range(
607        &self,
608        authority: AuthorityIndex,
609        start_round: Round,
610        end_round: Round,
611    ) -> Option<VerifiedBlock> {
612        if start_round >= end_round {
613            return None;
614        }
615
616        let block_ref = self.recent_refs_by_authority[authority]
617            .range((
618                Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
619                Excluded(BlockRef::new(
620                    end_round,
621                    AuthorityIndex::MIN,
622                    BlockDigest::MIN,
623                )),
624            ))
625            .last()?;
626
627        self.recent_blocks
628            .get(block_ref)
629            .map(|block_info| block_info.block.clone())
630    }
631
632    /// Returns the last block proposed per authority with `evicted round < round < end_round`.
633    /// The method is guaranteed to return results only when the `end_round` is not earlier of the
634    /// available cached data for each authority (evicted round + 1), otherwise the method will panic.
635    /// It's the caller's responsibility to ensure that is not requesting for earlier rounds.
636    /// In case of equivocation for an authority's last slot, one block will be returned (the last in order)
637    /// and the other equivocating blocks will be returned.
638    pub(crate) fn get_last_cached_block_per_authority(
639        &self,
640        end_round: Round,
641    ) -> Vec<(VerifiedBlock, Vec<BlockRef>)> {
642        // Initialize with the genesis blocks as fallback
643        let mut blocks = self.genesis.values().cloned().collect::<Vec<_>>();
644        let mut equivocating_blocks = vec![vec![]; self.context.committee.size()];
645
646        if end_round == GENESIS_ROUND {
647            panic!(
648                "Attempted to retrieve blocks earlier than the genesis round which is not possible"
649            );
650        }
651
652        if end_round == GENESIS_ROUND + 1 {
653            return blocks.into_iter().map(|b| (b, vec![])).collect();
654        }
655
656        for (authority_index, block_refs) in self.recent_refs_by_authority.iter().enumerate() {
657            let authority_index = self
658                .context
659                .committee
660                .to_authority_index(authority_index)
661                .unwrap();
662
663            let last_evicted_round = self.evicted_rounds[authority_index];
664            if end_round.saturating_sub(1) <= last_evicted_round {
665                panic!(
666                    "Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}",
667                );
668            }
669
670            let block_ref_iter = block_refs
671                .range((
672                    Included(BlockRef::new(
673                        last_evicted_round + 1,
674                        authority_index,
675                        BlockDigest::MIN,
676                    )),
677                    Excluded(BlockRef::new(end_round, authority_index, BlockDigest::MIN)),
678                ))
679                .rev();
680
681            let mut last_round = 0;
682            for block_ref in block_ref_iter {
683                if last_round == 0 {
684                    last_round = block_ref.round;
685                    let block_info = self
686                        .recent_blocks
687                        .get(block_ref)
688                        .expect("Block should exist in recent blocks");
689                    blocks[authority_index] = block_info.block.clone();
690                    continue;
691                }
692                if block_ref.round < last_round {
693                    break;
694                }
695                equivocating_blocks[authority_index].push(*block_ref);
696            }
697        }
698
699        blocks.into_iter().zip(equivocating_blocks).collect()
700    }
701
702    /// Checks whether a block exists in the slot. The method checks only against the cached data.
703    /// If the user asks for a slot that is not within the cached data then a panic is thrown.
704    pub(crate) fn contains_cached_block_at_slot(&self, slot: Slot) -> bool {
705        // Always return true for genesis slots.
706        if slot.round == GENESIS_ROUND {
707            return true;
708        }
709
710        let eviction_round = self.evicted_rounds[slot.authority];
711        if slot.round <= eviction_round {
712            panic!(
713                "{}",
714                format!(
715                    "Attempted to check for slot {slot} that is <= the last evicted round {eviction_round}"
716                )
717            );
718        }
719
720        let mut result = self.recent_refs_by_authority[slot.authority].range((
721            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
722            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
723        ));
724        result.next().is_some()
725    }
726
727    /// Checks whether the required blocks are in cache, if exist, or otherwise will check in store. The method is not caching
728    /// back the results, so its expensive if keep asking for cache missing blocks.
729    pub(crate) fn contains_blocks(&self, block_refs: Vec<BlockRef>) -> Vec<bool> {
730        let mut exist = vec![false; block_refs.len()];
731        let mut missing = Vec::new();
732
733        for (index, block_ref) in block_refs.into_iter().enumerate() {
734            let recent_refs = &self.recent_refs_by_authority[block_ref.author];
735            if recent_refs.contains(&block_ref) || self.genesis.contains_key(&block_ref) {
736                exist[index] = true;
737            } else if recent_refs.is_empty() || recent_refs.last().unwrap().round < block_ref.round
738            {
739                // Optimization: recent_refs contain the most recent blocks known to this authority.
740                // If a block ref is not found there and has a higher round, it definitely is
741                // missing from this authority and there is no need to check disk.
742                exist[index] = false;
743            } else {
744                missing.push((index, block_ref));
745            }
746        }
747
748        if missing.is_empty() {
749            return exist;
750        }
751
752        let missing_refs = missing
753            .iter()
754            .map(|(_, block_ref)| *block_ref)
755            .collect::<Vec<_>>();
756        let store_results = self
757            .store
758            .contains_blocks(&missing_refs)
759            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
760        self.context
761            .metrics
762            .node_metrics
763            .dag_state_store_read_count
764            .with_label_values(&["contains_blocks"])
765            .inc();
766
767        for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
768            exist[index] = result;
769        }
770
771        exist
772    }
773
774    pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool {
775        let blocks = self.contains_blocks(vec![*block_ref]);
776        blocks.first().cloned().unwrap()
777    }
778
779    // Sets the block as committed in the cache. If the block is set as committed for first time, then true is returned, otherwise false is returned instead.
780    // Method will panic if the block is not found in the cache.
781    pub(crate) fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
782        if let Some(block_info) = self.recent_blocks.get_mut(block_ref) {
783            if !block_info.committed {
784                block_info.committed = true;
785                return true;
786            }
787            false
788        } else {
789            panic!(
790                "Block {:?} not found in cache to set as committed.",
791                block_ref
792            );
793        }
794    }
795
796    /// Returns true if the block is committed. Only valid for blocks above the GC round.
797    pub(crate) fn is_committed(&self, block_ref: &BlockRef) -> bool {
798        self.recent_blocks
799            .get(block_ref)
800            .unwrap_or_else(|| panic!("Attempted to query for commit status for a block not in cached data {block_ref}"))
801            .committed
802    }
803
804    /// Recursively sets blocks in the causal history of the root block as hard linked, including the root block itself.
805    /// Returns the list of blocks that are newly linked.
806    /// The returned blocks are guaranteed to be above the GC round.
807    /// Transaction votes for the returned blocks are retrieved and carried by the upcoming
808    /// proposed block.
809    pub(crate) fn link_causal_history(&mut self, root_block: BlockRef) -> Vec<BlockRef> {
810        let gc_round = self.gc_round();
811        let mut linked_blocks = vec![];
812        let mut targets = VecDeque::new();
813        targets.push_back(root_block);
814        while let Some(block_ref) = targets.pop_front() {
815            // No need to collect or mark blocks at or below GC round.
816            // These blocks and their causal history will not be included in new commits.
817            // And their transactions do not need votes to finalize or skip.
818            //
819            // CommitFinalizer::gced_transaction_votes_for_pending_block() is the counterpart
820            // to this logic, when deciding if block A in the causal history of block B gets
821            // implicit accept transaction votes from block B.
822            if block_ref.round <= gc_round {
823                continue;
824            }
825            let block_info = self
826                .recent_blocks
827                .get_mut(&block_ref)
828                .unwrap_or_else(|| panic!("Block {:?} is not in DAG state", block_ref));
829            if block_info.included {
830                continue;
831            }
832            linked_blocks.push(block_ref);
833            block_info.included = true;
834            targets.extend(block_info.block.ancestors().iter());
835        }
836        linked_blocks
837    }
838
839    /// Returns true if the block has been included in an owned proposed block.
840    /// NOTE: caller should make sure only blocks above GC round are queried.
841    pub(crate) fn has_been_included(&self, block_ref: &BlockRef) -> bool {
842        self.recent_blocks
843            .get(block_ref)
844            .unwrap_or_else(|| {
845                panic!(
846                    "Attempted to query for inclusion status for a block not in cached data {}",
847                    block_ref
848                )
849            })
850            .included
851    }
852
853    pub(crate) fn threshold_clock_round(&self) -> Round {
854        self.threshold_clock.get_round()
855    }
856
857    // The timestamp of when quorum threshold was last reached in the threshold clock.
858    pub(crate) fn threshold_clock_quorum_ts(&self) -> Instant {
859        self.threshold_clock.get_quorum_ts()
860    }
861
862    pub(crate) fn highest_accepted_round(&self) -> Round {
863        self.highest_accepted_round
864    }
865
866    // Buffers a new commit in memory and updates last committed rounds.
867    // REQUIRED: must not skip over any commit index.
868    pub(crate) fn add_commit(&mut self, commit: TrustedCommit) {
869        let time_diff = if let Some(last_commit) = &self.last_commit {
870            if commit.index() <= last_commit.index() {
871                error!(
872                    "New commit index {} <= last commit index {}!",
873                    commit.index(),
874                    last_commit.index()
875                );
876                return;
877            }
878            assert_eq!(commit.index(), last_commit.index() + 1);
879
880            if commit.timestamp_ms() < last_commit.timestamp_ms() {
881                panic!(
882                    "Commit timestamps do not monotonically increment, prev commit {:?}, new commit {:?}",
883                    last_commit, commit
884                );
885            }
886            commit
887                .timestamp_ms()
888                .saturating_sub(last_commit.timestamp_ms())
889        } else {
890            assert_eq!(commit.index(), 1);
891            0
892        };
893
894        self.context
895            .metrics
896            .node_metrics
897            .last_commit_time_diff
898            .observe(time_diff as f64);
899
900        let commit_round_advanced = if let Some(previous_commit) = &self.last_commit {
901            previous_commit.round() < commit.round()
902        } else {
903            true
904        };
905
906        self.last_commit = Some(commit.clone());
907
908        if commit_round_advanced {
909            let now = std::time::Instant::now();
910            if let Some(previous_time) = self.last_commit_round_advancement_time {
911                self.context
912                    .metrics
913                    .node_metrics
914                    .commit_round_advancement_interval
915                    .observe(now.duration_since(previous_time).as_secs_f64())
916            }
917            self.last_commit_round_advancement_time = Some(now);
918        }
919
920        for block_ref in commit.blocks().iter() {
921            self.last_committed_rounds[block_ref.author] = max(
922                self.last_committed_rounds[block_ref.author],
923                block_ref.round,
924            );
925        }
926
927        for (i, round) in self.last_committed_rounds.iter().enumerate() {
928            let index = self.context.committee.to_authority_index(i).unwrap();
929            let hostname = &self.context.committee.authority(index).hostname;
930            self.context
931                .metrics
932                .node_metrics
933                .last_committed_authority_round
934                .with_label_values(&[hostname])
935                .set((*round).into());
936        }
937
938        self.pending_commit_votes.push_back(commit.reference());
939        self.commits_to_write.push(commit);
940    }
941
942    /// Recovers commits to write from storage, at startup.
943    pub(crate) fn recover_commits_to_write(&mut self, commits: Vec<TrustedCommit>) {
944        self.commits_to_write.extend(commits);
945    }
946
947    pub(crate) fn ensure_commits_to_write_is_empty(&self) {
948        assert!(
949            self.commits_to_write.is_empty(),
950            "Commits to write should be empty. {:?}",
951            self.commits_to_write,
952        );
953    }
954
955    pub(crate) fn add_commit_info(&mut self, reputation_scores: ReputationScores) {
956        // We create an empty scoring subdag once reputation scores are calculated.
957        // Note: It is okay for this to not be gated by protocol config as the
958        // scoring_subdag should be empty in either case at this point.
959        assert!(self.scoring_subdag.is_empty());
960
961        let commit_info = CommitInfo {
962            committed_rounds: self.last_committed_rounds.clone(),
963            reputation_scores,
964        };
965        let last_commit = self
966            .last_commit
967            .as_ref()
968            .expect("Last commit should already be set.");
969        self.commit_info_to_write
970            .push((last_commit.reference(), commit_info));
971    }
972
973    pub(crate) fn add_finalized_commit(
974        &mut self,
975        commit_ref: CommitRef,
976        rejected_transactions: BTreeMap<BlockRef, Vec<TransactionIndex>>,
977    ) {
978        self.finalized_commits_to_write
979            .push((commit_ref, rejected_transactions));
980    }
981
982    pub(crate) fn take_commit_votes(&mut self, limit: usize) -> Vec<CommitVote> {
983        let mut votes = Vec::new();
984        while !self.pending_commit_votes.is_empty() && votes.len() < limit {
985            votes.push(self.pending_commit_votes.pop_front().unwrap());
986        }
987        votes
988    }
989
990    /// Index of the last commit.
991    pub(crate) fn last_commit_index(&self) -> CommitIndex {
992        match &self.last_commit {
993            Some(commit) => commit.index(),
994            None => 0,
995        }
996    }
997
998    /// Digest of the last commit.
999    pub(crate) fn last_commit_digest(&self) -> CommitDigest {
1000        match &self.last_commit {
1001            Some(commit) => commit.digest(),
1002            None => CommitDigest::MIN,
1003        }
1004    }
1005
1006    /// Timestamp of the last commit.
1007    pub(crate) fn last_commit_timestamp_ms(&self) -> BlockTimestampMs {
1008        match &self.last_commit {
1009            Some(commit) => commit.timestamp_ms(),
1010            None => 0,
1011        }
1012    }
1013
1014    /// Leader slot of the last commit.
1015    pub(crate) fn last_commit_leader(&self) -> Slot {
1016        match &self.last_commit {
1017            Some(commit) => commit.leader().into(),
1018            None => self
1019                .genesis
1020                .iter()
1021                .next()
1022                .map(|(genesis_ref, _)| *genesis_ref)
1023                .expect("Genesis blocks should always be available.")
1024                .into(),
1025        }
1026    }
1027
1028    /// Highest round where a block is committed, which is last commit's leader round.
1029    pub(crate) fn last_commit_round(&self) -> Round {
1030        match &self.last_commit {
1031            Some(commit) => commit.leader().round,
1032            None => 0,
1033        }
1034    }
1035
1036    /// Last committed round per authority.
1037    pub(crate) fn last_committed_rounds(&self) -> Vec<Round> {
1038        self.last_committed_rounds.clone()
1039    }
1040
1041    /// The GC round is the highest round that blocks of equal or lower round are considered obsolete and no longer possible to be committed.
1042    /// There is no meaning accepting any blocks with round <= gc_round. The Garbage Collection (GC) round is calculated based on the latest
1043    /// committed leader round. When GC is disabled that will return the genesis round.
1044    pub(crate) fn gc_round(&self) -> Round {
1045        self.calculate_gc_round(self.last_commit_round())
1046    }
1047
1048    /// Calculates the GC round from the input leader round, which can be different
1049    /// from the last committed leader round.
1050    pub(crate) fn calculate_gc_round(&self, commit_round: Round) -> Round {
1051        commit_round.saturating_sub(self.context.protocol_config.gc_depth())
1052    }
1053
1054    /// Flushes unpersisted blocks, commits and commit info to storage.
1055    ///
1056    /// REQUIRED: when buffering a block, all of its ancestors and the latest commit which sets the GC round
1057    /// must also be buffered.
1058    /// REQUIRED: when buffering a commit, all of its included blocks and the previous commits must also be buffered.
1059    /// REQUIRED: when flushing, all of the buffered blocks and commits must be flushed together to ensure consistency.
1060    ///
1061    /// After each flush, DagState becomes persisted in storage and it expected to recover
1062    /// all internal states from storage after restarts.
1063    pub(crate) fn flush(&mut self) {
1064        let _s = self
1065            .context
1066            .metrics
1067            .node_metrics
1068            .scope_processing_time
1069            .with_label_values(&["DagState::flush"])
1070            .start_timer();
1071
1072        // Flush buffered data to storage.
1073        let pending_blocks = std::mem::take(&mut self.blocks_to_write);
1074        let pending_commits = std::mem::take(&mut self.commits_to_write);
1075        let pending_commit_info = std::mem::take(&mut self.commit_info_to_write);
1076        let pending_finalized_commits = std::mem::take(&mut self.finalized_commits_to_write);
1077        if pending_blocks.is_empty()
1078            && pending_commits.is_empty()
1079            && pending_commit_info.is_empty()
1080            && pending_finalized_commits.is_empty()
1081        {
1082            return;
1083        }
1084
1085        debug!(
1086            "Flushing {} blocks ({}), {} commits ({}), {} commit infos ({}), {} finalized commits ({}) to storage.",
1087            pending_blocks.len(),
1088            pending_blocks
1089                .iter()
1090                .map(|b| b.reference().to_string())
1091                .join(","),
1092            pending_commits.len(),
1093            pending_commits
1094                .iter()
1095                .map(|c| c.reference().to_string())
1096                .join(","),
1097            pending_commit_info.len(),
1098            pending_commit_info
1099                .iter()
1100                .map(|(commit_ref, _)| commit_ref.to_string())
1101                .join(","),
1102            pending_finalized_commits.len(),
1103            pending_finalized_commits
1104                .iter()
1105                .map(|(commit_ref, _)| commit_ref.to_string())
1106                .join(","),
1107        );
1108        self.store
1109            .write(WriteBatch::new(
1110                pending_blocks,
1111                pending_commits,
1112                pending_commit_info,
1113                pending_finalized_commits,
1114            ))
1115            .unwrap_or_else(|e| panic!("Failed to write to storage: {:?}", e));
1116        self.context
1117            .metrics
1118            .node_metrics
1119            .dag_state_store_write_count
1120            .inc();
1121
1122        // Clean up old cached data. After flushing, all cached blocks are guaranteed to be persisted.
1123        for (authority_index, _) in self.context.committee.authorities() {
1124            let eviction_round = self.calculate_authority_eviction_round(authority_index);
1125            while let Some(block_ref) = self.recent_refs_by_authority[authority_index].first() {
1126                if block_ref.round <= eviction_round {
1127                    self.recent_blocks.remove(block_ref);
1128                    self.recent_refs_by_authority[authority_index].pop_first();
1129                } else {
1130                    break;
1131                }
1132            }
1133            self.evicted_rounds[authority_index] = eviction_round;
1134        }
1135
1136        let metrics = &self.context.metrics.node_metrics;
1137        metrics
1138            .dag_state_recent_blocks
1139            .set(self.recent_blocks.len() as i64);
1140        metrics.dag_state_recent_refs.set(
1141            self.recent_refs_by_authority
1142                .iter()
1143                .map(BTreeSet::len)
1144                .sum::<usize>() as i64,
1145        );
1146    }
1147
1148    pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> {
1149        self.store
1150            .read_last_commit_info()
1151            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
1152    }
1153
1154    pub(crate) fn add_scoring_subdags(&mut self, scoring_subdags: Vec<CommittedSubDag>) {
1155        self.scoring_subdag.add_subdags(scoring_subdags);
1156    }
1157
1158    pub(crate) fn clear_scoring_subdag(&mut self) {
1159        self.scoring_subdag.clear();
1160    }
1161
1162    pub(crate) fn scoring_subdags_count(&self) -> usize {
1163        self.scoring_subdag.scored_subdags_count()
1164    }
1165
1166    pub(crate) fn is_scoring_subdag_empty(&self) -> bool {
1167        self.scoring_subdag.is_empty()
1168    }
1169
1170    pub(crate) fn calculate_scoring_subdag_scores(&self) -> ReputationScores {
1171        self.scoring_subdag.calculate_distributed_vote_scores()
1172    }
1173
1174    pub(crate) fn scoring_subdag_commit_range(&self) -> CommitIndex {
1175        self.scoring_subdag
1176            .commit_range
1177            .as_ref()
1178            .expect("commit range should exist for scoring subdag")
1179            .end()
1180    }
1181
1182    /// The last round that should get evicted after a cache clean up operation. After this round we are
1183    /// guaranteed to have all the produced blocks from that authority. For any round that is
1184    /// <= `last_evicted_round` we don't have such guarantees as out of order blocks might exist.
1185    fn calculate_authority_eviction_round(&self, authority_index: AuthorityIndex) -> Round {
1186        let last_round = self.recent_refs_by_authority[authority_index]
1187            .last()
1188            .map(|block_ref| block_ref.round)
1189            .unwrap_or(GENESIS_ROUND);
1190
1191        Self::eviction_round(last_round, self.gc_round(), self.cached_rounds)
1192    }
1193
1194    /// Calculates the eviction round for the given authority. The goal is to keep at least `cached_rounds`
1195    /// of the latest blocks in the cache (if enough data is available), while evicting blocks with rounds <= `gc_round` when possible.
1196    fn eviction_round(last_round: Round, gc_round: Round, cached_rounds: u32) -> Round {
1197        gc_round.min(last_round.saturating_sub(cached_rounds))
1198    }
1199
1200    /// Returns the underlying store.
1201    pub(crate) fn store(&self) -> Arc<dyn Store> {
1202        self.store.clone()
1203    }
1204
1205    /// Detects and returns the blocks of the round that forms the last quorum. The method will return
1206    /// the quorum even if that's genesis.
1207    #[cfg(test)]
1208    pub(crate) fn last_quorum(&self) -> Vec<VerifiedBlock> {
1209        // the quorum should exist either on the highest accepted round or the one before. If we fail to detect
1210        // a quorum then it means that our DAG has advanced with missing causal history.
1211        for round in
1212            (self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev()
1213        {
1214            if round == GENESIS_ROUND {
1215                return self.genesis_blocks();
1216            }
1217            use crate::stake_aggregator::{QuorumThreshold, StakeAggregator};
1218            let mut quorum = StakeAggregator::<QuorumThreshold>::new();
1219
1220            // Since the minimum wave length is 3 we expect to find a quorum in the uncommitted rounds.
1221            let blocks = self.get_uncommitted_blocks_at_round(round);
1222            for block in &blocks {
1223                if quorum.add(block.author(), &self.context.committee) {
1224                    return blocks;
1225                }
1226            }
1227        }
1228
1229        panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds.");
1230    }
1231
1232    #[cfg(test)]
1233    pub(crate) fn genesis_blocks(&self) -> Vec<VerifiedBlock> {
1234        self.genesis.values().cloned().collect()
1235    }
1236
1237    #[cfg(test)]
1238    pub(crate) fn set_last_commit(&mut self, commit: TrustedCommit) {
1239        self.last_commit = Some(commit);
1240    }
1241}
1242
1243struct BlockInfo {
1244    block: VerifiedBlock,
1245    // Whether the block has been committed
1246    committed: bool,
1247    // Whether the block has been included in the causal history of an owned proposed block.
1248    ///
1249    /// There are two usages of this field:
1250    /// 1. When proposing blocks, determine the set of blocks to carry votes for.
1251    /// 2. When recovering, determine if a block has not been included in a proposed block and
1252    ///    should recover transaction votes by voting.
1253    included: bool,
1254}
1255
1256impl BlockInfo {
1257    fn new(block: VerifiedBlock) -> Self {
1258        Self {
1259            block,
1260            committed: false,
1261            included: false,
1262        }
1263    }
1264}
1265
1266#[cfg(test)]
1267mod test {
1268    use std::vec;
1269
1270    use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs};
1271    use parking_lot::RwLock;
1272
1273    use super::*;
1274    use crate::{
1275        block::{TestBlock, VerifiedBlock},
1276        storage::{WriteBatch, mem_store::MemStore},
1277        test_dag_builder::DagBuilder,
1278        test_dag_parser::parse_dag,
1279    };
1280
1281    #[tokio::test]
1282    async fn test_get_blocks() {
1283        let (context, _) = Context::new_for_test(4);
1284        let context = Arc::new(context);
1285        let store = Arc::new(MemStore::new());
1286        let mut dag_state = DagState::new(context.clone(), store.clone());
1287        let own_index = AuthorityIndex::new_for_test(0);
1288
1289        // Populate test blocks for round 1 ~ 10, authorities 0 ~ 2.
1290        let num_rounds: u32 = 10;
1291        let non_existent_round: u32 = 100;
1292        let num_authorities: u32 = 3;
1293        let num_blocks_per_slot: usize = 3;
1294        let mut blocks = BTreeMap::new();
1295        for round in 1..=num_rounds {
1296            for author in 0..num_authorities {
1297                // Create 3 blocks per slot, with different timestamps and digests.
1298                let base_ts = round as BlockTimestampMs * 1000;
1299                for timestamp in base_ts..base_ts + num_blocks_per_slot as u64 {
1300                    let block = VerifiedBlock::new_for_test(
1301                        TestBlock::new(round, author)
1302                            .set_timestamp_ms(timestamp)
1303                            .build(),
1304                    );
1305                    dag_state.accept_block(block.clone());
1306                    blocks.insert(block.reference(), block);
1307
1308                    // Only write one block per slot for own index
1309                    if AuthorityIndex::new_for_test(author) == own_index {
1310                        break;
1311                    }
1312                }
1313            }
1314        }
1315
1316        // Check uncommitted blocks that exist.
1317        for (r, block) in &blocks {
1318            assert_eq!(&dag_state.get_block(r).unwrap(), block);
1319        }
1320
1321        // Check uncommitted blocks that do not exist.
1322        let last_ref = blocks.keys().last().unwrap();
1323        assert!(
1324            dag_state
1325                .get_block(&BlockRef::new(
1326                    last_ref.round,
1327                    last_ref.author,
1328                    BlockDigest::MIN
1329                ))
1330                .is_none()
1331        );
1332
1333        // Check slots with uncommitted blocks.
1334        for round in 1..=num_rounds {
1335            for author in 0..num_authorities {
1336                let slot = Slot::new(
1337                    round,
1338                    context
1339                        .committee
1340                        .to_authority_index(author as usize)
1341                        .unwrap(),
1342                );
1343                let blocks = dag_state.get_uncommitted_blocks_at_slot(slot);
1344
1345                // We only write one block per slot for own index
1346                if AuthorityIndex::new_for_test(author) == own_index {
1347                    assert_eq!(blocks.len(), 1);
1348                } else {
1349                    assert_eq!(blocks.len(), num_blocks_per_slot);
1350                }
1351
1352                for b in blocks {
1353                    assert_eq!(b.round(), round);
1354                    assert_eq!(
1355                        b.author(),
1356                        context
1357                            .committee
1358                            .to_authority_index(author as usize)
1359                            .unwrap()
1360                    );
1361                }
1362            }
1363        }
1364
1365        // Check slots without uncommitted blocks.
1366        let slot = Slot::new(non_existent_round, AuthorityIndex::ZERO);
1367        assert!(dag_state.get_uncommitted_blocks_at_slot(slot).is_empty());
1368
1369        // Check rounds with uncommitted blocks.
1370        for round in 1..=num_rounds {
1371            let blocks = dag_state.get_uncommitted_blocks_at_round(round);
1372            // Expect 3 blocks per authority except for own authority which should
1373            // have 1 block.
1374            assert_eq!(
1375                blocks.len(),
1376                (num_authorities - 1) as usize * num_blocks_per_slot + 1
1377            );
1378            for b in blocks {
1379                assert_eq!(b.round(), round);
1380            }
1381        }
1382
1383        // Check rounds without uncommitted blocks.
1384        assert!(
1385            dag_state
1386                .get_uncommitted_blocks_at_round(non_existent_round)
1387                .is_empty()
1388        );
1389    }
1390
1391    #[tokio::test]
1392    async fn test_ancestors_at_uncommitted_round() {
1393        // Initialize DagState.
1394        let (context, _) = Context::new_for_test(4);
1395        let context = Arc::new(context);
1396        let store = Arc::new(MemStore::new());
1397        let mut dag_state = DagState::new(context.clone(), store.clone());
1398
1399        // Populate DagState.
1400
1401        // Round 10 refs will not have their blocks in DagState.
1402        let round_10_refs: Vec<_> = (0..4)
1403            .map(|a| {
1404                VerifiedBlock::new_for_test(TestBlock::new(10, a).set_timestamp_ms(1000).build())
1405                    .reference()
1406            })
1407            .collect();
1408
1409        // Round 11 blocks.
1410        let round_11 = [
1411            // This will connect to round 12.
1412            VerifiedBlock::new_for_test(
1413                TestBlock::new(11, 0)
1414                    .set_timestamp_ms(1100)
1415                    .set_ancestors(round_10_refs.clone())
1416                    .build(),
1417            ),
1418            // Slot(11, 1) has 3 blocks.
1419            // This will connect to round 12.
1420            VerifiedBlock::new_for_test(
1421                TestBlock::new(11, 1)
1422                    .set_timestamp_ms(1110)
1423                    .set_ancestors(round_10_refs.clone())
1424                    .build(),
1425            ),
1426            // This will connect to round 13.
1427            VerifiedBlock::new_for_test(
1428                TestBlock::new(11, 1)
1429                    .set_timestamp_ms(1111)
1430                    .set_ancestors(round_10_refs.clone())
1431                    .build(),
1432            ),
1433            // This will not connect to any block.
1434            VerifiedBlock::new_for_test(
1435                TestBlock::new(11, 1)
1436                    .set_timestamp_ms(1112)
1437                    .set_ancestors(round_10_refs.clone())
1438                    .build(),
1439            ),
1440            // This will not connect to any block.
1441            VerifiedBlock::new_for_test(
1442                TestBlock::new(11, 2)
1443                    .set_timestamp_ms(1120)
1444                    .set_ancestors(round_10_refs.clone())
1445                    .build(),
1446            ),
1447            // This will connect to round 12.
1448            VerifiedBlock::new_for_test(
1449                TestBlock::new(11, 3)
1450                    .set_timestamp_ms(1130)
1451                    .set_ancestors(round_10_refs.clone())
1452                    .build(),
1453            ),
1454        ];
1455
1456        // Round 12 blocks.
1457        let ancestors_for_round_12 = vec![
1458            round_11[0].reference(),
1459            round_11[1].reference(),
1460            round_11[5].reference(),
1461        ];
1462        let round_12 = [
1463            VerifiedBlock::new_for_test(
1464                TestBlock::new(12, 0)
1465                    .set_timestamp_ms(1200)
1466                    .set_ancestors(ancestors_for_round_12.clone())
1467                    .build(),
1468            ),
1469            VerifiedBlock::new_for_test(
1470                TestBlock::new(12, 2)
1471                    .set_timestamp_ms(1220)
1472                    .set_ancestors(ancestors_for_round_12.clone())
1473                    .build(),
1474            ),
1475            VerifiedBlock::new_for_test(
1476                TestBlock::new(12, 3)
1477                    .set_timestamp_ms(1230)
1478                    .set_ancestors(ancestors_for_round_12.clone())
1479                    .build(),
1480            ),
1481        ];
1482
1483        // Round 13 blocks.
1484        let ancestors_for_round_13 = vec![
1485            round_12[0].reference(),
1486            round_12[1].reference(),
1487            round_12[2].reference(),
1488            round_11[2].reference(),
1489        ];
1490        let round_13 = [
1491            VerifiedBlock::new_for_test(
1492                TestBlock::new(12, 1)
1493                    .set_timestamp_ms(1300)
1494                    .set_ancestors(ancestors_for_round_13.clone())
1495                    .build(),
1496            ),
1497            VerifiedBlock::new_for_test(
1498                TestBlock::new(12, 2)
1499                    .set_timestamp_ms(1320)
1500                    .set_ancestors(ancestors_for_round_13.clone())
1501                    .build(),
1502            ),
1503            VerifiedBlock::new_for_test(
1504                TestBlock::new(12, 3)
1505                    .set_timestamp_ms(1330)
1506                    .set_ancestors(ancestors_for_round_13.clone())
1507                    .build(),
1508            ),
1509        ];
1510
1511        // Round 14 anchor block.
1512        let ancestors_for_round_14 = round_13.iter().map(|b| b.reference()).collect();
1513        let anchor = VerifiedBlock::new_for_test(
1514            TestBlock::new(14, 1)
1515                .set_timestamp_ms(1410)
1516                .set_ancestors(ancestors_for_round_14)
1517                .build(),
1518        );
1519
1520        // Add all blocks (at and above round 11) to DagState.
1521        for b in round_11
1522            .iter()
1523            .chain(round_12.iter())
1524            .chain(round_13.iter())
1525            .chain([anchor.clone()].iter())
1526        {
1527            dag_state.accept_block(b.clone());
1528        }
1529
1530        // Check ancestors connected to anchor.
1531        let ancestors = dag_state.ancestors_at_round(&anchor, 11);
1532        let mut ancestors_refs: Vec<BlockRef> = ancestors.iter().map(|b| b.reference()).collect();
1533        ancestors_refs.sort();
1534        let mut expected_refs = vec![
1535            round_11[0].reference(),
1536            round_11[1].reference(),
1537            round_11[2].reference(),
1538            round_11[5].reference(),
1539        ];
1540        expected_refs.sort(); // we need to sort as blocks with same author and round of round 11 (position 1 & 2) might not be in right lexicographical order.
1541        assert_eq!(
1542            ancestors_refs, expected_refs,
1543            "Expected round 11 ancestors: {:?}. Got: {:?}",
1544            expected_refs, ancestors_refs
1545        );
1546    }
1547
1548    #[tokio::test]
1549    async fn test_link_causal_history() {
1550        let (mut context, _) = Context::new_for_test(4);
1551        context.parameters.dag_state_cached_rounds = 10;
1552        context
1553            .protocol_config
1554            .set_consensus_gc_depth_for_testing(3);
1555        let context = Arc::new(context);
1556
1557        let store = Arc::new(MemStore::new());
1558        let mut dag_state = DagState::new(context.clone(), store.clone());
1559
1560        // Create for rounds 1..=6. Skip creating blocks for authority 0 for rounds 4 - 6.
1561        let mut dag_builder = DagBuilder::new(context.clone());
1562        dag_builder.layers(1..=3).build();
1563        dag_builder
1564            .layers(4..=6)
1565            .authorities(vec![AuthorityIndex::new_for_test(0)])
1566            .skip_block()
1567            .build();
1568
1569        // Accept all blocks
1570        let all_blocks = dag_builder.all_blocks();
1571        dag_state.accept_blocks(all_blocks.clone());
1572
1573        // No block is linked yet.
1574        for block in &all_blocks {
1575            assert!(!dag_state.has_been_included(&block.reference()));
1576        }
1577
1578        // Link causal history from a round 1 block.
1579        let round_1_block = &all_blocks[1];
1580        assert_eq!(round_1_block.round(), 1);
1581        let linked_blocks = dag_state.link_causal_history(round_1_block.reference());
1582
1583        // Check that the block is linked.
1584        assert_eq!(linked_blocks.len(), 1);
1585        assert_eq!(linked_blocks[0], round_1_block.reference());
1586        for block_ref in linked_blocks {
1587            assert!(dag_state.has_been_included(&block_ref));
1588        }
1589
1590        // Link causal history from a round 2 block.
1591        let round_2_block = &all_blocks[4];
1592        assert_eq!(round_2_block.round(), 2);
1593        let linked_blocks = dag_state.link_causal_history(round_2_block.reference());
1594
1595        // Check the linked blocks.
1596        assert_eq!(linked_blocks.len(), 4);
1597        for block_ref in linked_blocks {
1598            assert!(block_ref == round_2_block.reference() || block_ref.round == 1);
1599        }
1600
1601        // Check linked status in dag state.
1602        for block in &all_blocks {
1603            if block.round() == 1 || block.reference() == round_2_block.reference() {
1604                assert!(dag_state.has_been_included(&block.reference()));
1605            } else {
1606                assert!(!dag_state.has_been_included(&block.reference()));
1607            }
1608        }
1609
1610        // Select round 6 block.
1611        let round_6_block = all_blocks.last().unwrap();
1612        assert_eq!(round_6_block.round(), 6);
1613
1614        // Get GC round to 3.
1615        let last_commit = TrustedCommit::new_for_test(
1616            6,
1617            CommitDigest::MIN,
1618            context.clock.timestamp_utc_ms(),
1619            round_6_block.reference(),
1620            vec![],
1621        );
1622        dag_state.set_last_commit(last_commit);
1623        assert_eq!(
1624            dag_state.gc_round(),
1625            3,
1626            "GC round should have moved to round 3"
1627        );
1628
1629        // Link causal history from a round 6 block.
1630        let linked_blocks = dag_state.link_causal_history(round_6_block.reference());
1631
1632        // Check the linked blocks. They should not include GC'ed blocks.
1633        assert_eq!(linked_blocks.len(), 7, "Linked blocks: {:?}", linked_blocks);
1634        for block_ref in linked_blocks {
1635            assert!(
1636                block_ref.round == 4
1637                    || block_ref.round == 5
1638                    || block_ref == round_6_block.reference()
1639            );
1640        }
1641
1642        // Check linked status in dag state.
1643        for block in &all_blocks {
1644            let block_ref = block.reference();
1645            if block.round() == 1
1646                || block_ref == round_2_block.reference()
1647                || block_ref.round == 4
1648                || block_ref.round == 5
1649                || block_ref == round_6_block.reference()
1650            {
1651                assert!(dag_state.has_been_included(&block.reference()));
1652            } else {
1653                assert!(!dag_state.has_been_included(&block.reference()));
1654            }
1655        }
1656    }
1657
1658    #[tokio::test]
1659    async fn test_contains_blocks_in_cache_or_store() {
1660        /// Only keep elements up to 2 rounds before the last committed round
1661        const CACHED_ROUNDS: Round = 2;
1662
1663        let (mut context, _) = Context::new_for_test(4);
1664        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1665
1666        let context = Arc::new(context);
1667        let store = Arc::new(MemStore::new());
1668        let mut dag_state = DagState::new(context.clone(), store.clone());
1669
1670        // Create test blocks for round 1 ~ 10
1671        let num_rounds: u32 = 10;
1672        let num_authorities: u32 = 4;
1673        let mut blocks = Vec::new();
1674
1675        for round in 1..=num_rounds {
1676            for author in 0..num_authorities {
1677                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1678                blocks.push(block);
1679            }
1680        }
1681
1682        // Now write in store the blocks from first 4 rounds and the rest to the dag state
1683        blocks.clone().into_iter().for_each(|block| {
1684            if block.round() <= 4 {
1685                store
1686                    .write(WriteBatch::default().blocks(vec![block]))
1687                    .unwrap();
1688            } else {
1689                dag_state.accept_blocks(vec![block]);
1690            }
1691        });
1692
1693        // Now when trying to query whether we have all the blocks, we should successfully retrieve a positive answer
1694        // where the blocks of first 4 round should be found in DagState and the rest in store.
1695        let mut block_refs = blocks
1696            .iter()
1697            .map(|block| block.reference())
1698            .collect::<Vec<_>>();
1699        let result = dag_state.contains_blocks(block_refs.clone());
1700
1701        // Ensure everything is found
1702        let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1703        assert_eq!(result, expected);
1704
1705        // Now try to ask also for one block ref that is neither in cache nor in store
1706        block_refs.insert(
1707            3,
1708            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1709        );
1710        let result = dag_state.contains_blocks(block_refs.clone());
1711
1712        // Then all should be found apart from the last one
1713        expected.insert(3, false);
1714        assert_eq!(result, expected.clone());
1715    }
1716
1717    #[tokio::test]
1718    async fn test_contains_cached_block_at_slot() {
1719        /// Only keep elements up to 2 rounds before the last committed round
1720        const CACHED_ROUNDS: Round = 2;
1721
1722        let num_authorities: u32 = 4;
1723        let (mut context, _) = Context::new_for_test(num_authorities as usize);
1724        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1725
1726        let context = Arc::new(context);
1727        let store = Arc::new(MemStore::new());
1728        let mut dag_state = DagState::new(context.clone(), store.clone());
1729
1730        // Create test blocks for round 1 ~ 10
1731        let num_rounds: u32 = 10;
1732        let mut blocks = Vec::new();
1733
1734        for round in 1..=num_rounds {
1735            for author in 0..num_authorities {
1736                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1737                blocks.push(block.clone());
1738                dag_state.accept_block(block);
1739            }
1740        }
1741
1742        // Query for genesis round 0, genesis blocks should be returned
1743        for (author, _) in context.committee.authorities() {
1744            assert!(
1745                dag_state.contains_cached_block_at_slot(Slot::new(GENESIS_ROUND, author)),
1746                "Genesis should always be found"
1747            );
1748        }
1749
1750        // Now when trying to query whether we have all the blocks, we should successfully retrieve a positive answer
1751        // where the blocks of first 4 round should be found in DagState and the rest in store.
1752        let mut block_refs = blocks
1753            .iter()
1754            .map(|block| block.reference())
1755            .collect::<Vec<_>>();
1756
1757        for block_ref in block_refs.clone() {
1758            let slot = block_ref.into();
1759            let found = dag_state.contains_cached_block_at_slot(slot);
1760            assert!(found, "A block should be found at slot {}", slot);
1761        }
1762
1763        // Now try to ask also for one block ref that is not in cache
1764        // Then all should be found apart from the last one
1765        block_refs.insert(
1766            3,
1767            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1768        );
1769        let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1770        expected.insert(3, false);
1771
1772        // Attempt to check the same for via the contains slot method
1773        for block_ref in block_refs {
1774            let slot = block_ref.into();
1775            let found = dag_state.contains_cached_block_at_slot(slot);
1776
1777            assert_eq!(expected.remove(0), found);
1778        }
1779    }
1780
1781    #[tokio::test]
1782    #[ignore]
1783    #[should_panic(
1784        expected = "Attempted to check for slot [1]3 that is <= the last gc evicted round 3"
1785    )]
1786    async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range() {
1787        /// Keep 2 rounds from the highest committed round. This is considered universal and minimum necessary blocks to hold
1788        /// for the correct node operation.
1789        const GC_DEPTH: u32 = 2;
1790        /// Keep at least 3 rounds in cache for each authority.
1791        const CACHED_ROUNDS: Round = 3;
1792
1793        let (mut context, _) = Context::new_for_test(4);
1794        context
1795            .protocol_config
1796            .set_consensus_gc_depth_for_testing(GC_DEPTH);
1797        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1798
1799        let context = Arc::new(context);
1800        let store = Arc::new(MemStore::new());
1801        let mut dag_state = DagState::new(context.clone(), store.clone());
1802
1803        // Create for rounds 1..=6. Skip creating blocks for authority 0 for rounds 4 - 6.
1804        let mut dag_builder = DagBuilder::new(context.clone());
1805        dag_builder.layers(1..=3).build();
1806        dag_builder
1807            .layers(4..=6)
1808            .authorities(vec![AuthorityIndex::new_for_test(0)])
1809            .skip_block()
1810            .build();
1811
1812        // Accept all blocks
1813        dag_builder
1814            .all_blocks()
1815            .into_iter()
1816            .for_each(|block| dag_state.accept_block(block));
1817
1818        // Now add a commit for leader round 5 to trigger an eviction
1819        dag_state.add_commit(TrustedCommit::new_for_test(
1820            1 as CommitIndex,
1821            CommitDigest::MIN,
1822            0,
1823            dag_builder.leader_block(5).unwrap().reference(),
1824            vec![],
1825        ));
1826        // Flush the DAG state to storage.
1827        dag_state.flush();
1828
1829        // Ensure that gc round has been updated
1830        assert_eq!(dag_state.gc_round(), 3, "GC round should be 3");
1831
1832        // Now what we expect to happen is for:
1833        // * Nodes 1 - 3 should have in cache blocks from gc_round (3) and onwards.
1834        // * Node 0 should have in cache blocks from it's latest round, 3, up to round 1, which is the number of cached_rounds.
1835        for authority_index in 1..=3 {
1836            for round in 4..=6 {
1837                assert!(dag_state.contains_cached_block_at_slot(Slot::new(
1838                    round,
1839                    AuthorityIndex::new_for_test(authority_index)
1840                )));
1841            }
1842        }
1843
1844        for round in 1..=3 {
1845            assert!(
1846                dag_state.contains_cached_block_at_slot(Slot::new(
1847                    round,
1848                    AuthorityIndex::new_for_test(0)
1849                ))
1850            );
1851        }
1852
1853        // When trying to request for authority 1 at block slot 3 it should panic, as anything
1854        // that is <= 3 should be evicted
1855        let _ =
1856            dag_state.contains_cached_block_at_slot(Slot::new(3, AuthorityIndex::new_for_test(1)));
1857    }
1858
1859    #[tokio::test]
1860    async fn test_get_blocks_in_cache_or_store() {
1861        let (context, _) = Context::new_for_test(4);
1862        let context = Arc::new(context);
1863        let store = Arc::new(MemStore::new());
1864        let mut dag_state = DagState::new(context.clone(), store.clone());
1865
1866        // Create test blocks for round 1 ~ 10
1867        let num_rounds: u32 = 10;
1868        let num_authorities: u32 = 4;
1869        let mut blocks = Vec::new();
1870
1871        for round in 1..=num_rounds {
1872            for author in 0..num_authorities {
1873                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1874                blocks.push(block);
1875            }
1876        }
1877
1878        // Now write in store the blocks from first 4 rounds and the rest to the dag state
1879        blocks.clone().into_iter().for_each(|block| {
1880            if block.round() <= 4 {
1881                store
1882                    .write(WriteBatch::default().blocks(vec![block]))
1883                    .unwrap();
1884            } else {
1885                dag_state.accept_blocks(vec![block]);
1886            }
1887        });
1888
1889        // Now when trying to query whether we have all the blocks, we should successfully retrieve a positive answer
1890        // where the blocks of first 4 round should be found in DagState and the rest in store.
1891        let mut block_refs = blocks
1892            .iter()
1893            .map(|block| block.reference())
1894            .collect::<Vec<_>>();
1895        let result = dag_state.get_blocks(&block_refs);
1896
1897        let mut expected = blocks
1898            .into_iter()
1899            .map(Some)
1900            .collect::<Vec<Option<VerifiedBlock>>>();
1901
1902        // Ensure everything is found
1903        assert_eq!(result, expected.clone());
1904
1905        // Now try to ask also for one block ref that is neither in cache nor in store
1906        block_refs.insert(
1907            3,
1908            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1909        );
1910        let result = dag_state.get_blocks(&block_refs);
1911
1912        // Then all should be found apart from the last one
1913        expected.insert(3, None);
1914        assert_eq!(result, expected);
1915    }
1916
1917    #[tokio::test]
1918    async fn test_flush_and_recovery() {
1919        telemetry_subscribers::init_for_testing();
1920
1921        const GC_DEPTH: u32 = 3;
1922        const CACHED_ROUNDS: u32 = 4;
1923
1924        let num_authorities: u32 = 4;
1925        let (mut context, _) = Context::new_for_test(num_authorities as usize);
1926        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1927        context
1928            .protocol_config
1929            .set_consensus_gc_depth_for_testing(GC_DEPTH);
1930
1931        let context = Arc::new(context);
1932
1933        let store = Arc::new(MemStore::new());
1934        let mut dag_state = DagState::new(context.clone(), store.clone());
1935
1936        const NUM_ROUNDS: Round = 20;
1937        let mut dag_builder = DagBuilder::new(context.clone());
1938        dag_builder.layers(1..=5).build();
1939        dag_builder
1940            .layers(6..=8)
1941            .authorities(vec![AuthorityIndex::new_for_test(0)])
1942            .skip_block()
1943            .build();
1944        dag_builder.layers(9..=NUM_ROUNDS).build();
1945
1946        // Get all commits from the DAG builder.
1947        const LAST_COMMIT_ROUND: Round = 16;
1948        const LAST_COMMIT_INDEX: CommitIndex = 15;
1949        let commits = dag_builder
1950            .get_sub_dag_and_commits(1..=NUM_ROUNDS)
1951            .into_iter()
1952            .map(|(_subdag, commit)| commit)
1953            .take(LAST_COMMIT_INDEX as usize)
1954            .collect::<Vec<_>>();
1955        assert_eq!(commits.len(), LAST_COMMIT_INDEX as usize);
1956        assert_eq!(commits.last().unwrap().round(), LAST_COMMIT_ROUND);
1957
1958        // Add the blocks from first 11 rounds and first 8 commits to the dag state
1959        // Note that the commit of round 8 is missing because where authority 0 is the leader but produced no block.
1960        // So commit 8 has leader round 9.
1961        const PERSISTED_BLOCK_ROUNDS: u32 = 12;
1962        const NUM_PERSISTED_COMMITS: usize = 8;
1963        const LAST_PERSISTED_COMMIT_ROUND: Round = 9;
1964        const LAST_PERSISTED_COMMIT_INDEX: CommitIndex = 8;
1965        dag_state.accept_blocks(dag_builder.blocks(1..=PERSISTED_BLOCK_ROUNDS));
1966        let mut finalized_commits = vec![];
1967        for commit in commits.iter().take(NUM_PERSISTED_COMMITS).cloned() {
1968            finalized_commits.push(commit.clone());
1969            dag_state.add_commit(commit);
1970        }
1971        let last_finalized_commit = finalized_commits.last().unwrap();
1972        assert_eq!(last_finalized_commit.round(), LAST_PERSISTED_COMMIT_ROUND);
1973        assert_eq!(last_finalized_commit.index(), LAST_PERSISTED_COMMIT_INDEX);
1974
1975        // Collect finalized blocks.
1976        let finalized_blocks = finalized_commits
1977            .iter()
1978            .flat_map(|commit| commit.blocks())
1979            .collect::<BTreeSet<_>>();
1980
1981        // Flush commits from the dag state
1982        dag_state.flush();
1983
1984        // Verify the store has blocks up to round 12, and commits up to index 8.
1985        let store_blocks = store
1986            .scan_blocks_by_author(AuthorityIndex::new_for_test(1), 1)
1987            .unwrap();
1988        assert_eq!(store_blocks.last().unwrap().round(), PERSISTED_BLOCK_ROUNDS);
1989        let store_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1990        assert_eq!(store_commits.len(), NUM_PERSISTED_COMMITS);
1991        assert_eq!(
1992            store_commits.last().unwrap().index(),
1993            LAST_PERSISTED_COMMIT_INDEX
1994        );
1995        assert_eq!(
1996            store_commits.last().unwrap().round(),
1997            LAST_PERSISTED_COMMIT_ROUND
1998        );
1999
2000        // Add the rest of the blocks and commits to the dag state
2001        dag_state.accept_blocks(dag_builder.blocks(PERSISTED_BLOCK_ROUNDS + 1..=NUM_ROUNDS));
2002        for commit in commits.iter().skip(NUM_PERSISTED_COMMITS).cloned() {
2003            dag_state.add_commit(commit);
2004        }
2005
2006        // All blocks should be found in DagState.
2007        let all_blocks = dag_builder.blocks(1..=NUM_ROUNDS);
2008        let block_refs = all_blocks
2009            .iter()
2010            .map(|block| block.reference())
2011            .collect::<Vec<_>>();
2012        let result = dag_state
2013            .get_blocks(&block_refs)
2014            .into_iter()
2015            .map(|b| b.unwrap())
2016            .collect::<Vec<_>>();
2017        assert_eq!(result, all_blocks);
2018
2019        // Last commit index from DagState should now be 15
2020        assert_eq!(dag_state.last_commit_index(), LAST_COMMIT_INDEX);
2021
2022        // Destroy the dag state without flushing additional data.
2023        drop(dag_state);
2024
2025        // Recover the state from the store
2026        let dag_state = DagState::new(context.clone(), store.clone());
2027
2028        // Persisted blocks rounds should be found in DagState.
2029        let all_blocks = dag_builder.blocks(1..=PERSISTED_BLOCK_ROUNDS);
2030        let block_refs = all_blocks
2031            .iter()
2032            .map(|block| block.reference())
2033            .collect::<Vec<_>>();
2034        let result = dag_state
2035            .get_blocks(&block_refs)
2036            .into_iter()
2037            .map(|b| b.unwrap())
2038            .collect::<Vec<_>>();
2039        assert_eq!(result, all_blocks);
2040
2041        // Unpersisted blocks should not be in DagState, because they are not flushed.
2042        let missing_blocks = dag_builder.blocks(PERSISTED_BLOCK_ROUNDS + 1..=NUM_ROUNDS);
2043        let block_refs = missing_blocks
2044            .iter()
2045            .map(|block| block.reference())
2046            .collect::<Vec<_>>();
2047        let retrieved_blocks = dag_state
2048            .get_blocks(&block_refs)
2049            .into_iter()
2050            .flatten()
2051            .collect::<Vec<_>>();
2052        assert!(retrieved_blocks.is_empty());
2053
2054        // Recovered last commit index and round should be 8 and 9.
2055        assert_eq!(dag_state.last_commit_index(), LAST_PERSISTED_COMMIT_INDEX);
2056        assert_eq!(dag_state.last_commit_round(), LAST_PERSISTED_COMMIT_ROUND);
2057
2058        // The last_commit_rounds of the finalized commits should have been recovered.
2059        let expected_last_committed_rounds = vec![5, 9, 8, 8];
2060        assert_eq!(
2061            dag_state.last_committed_rounds(),
2062            expected_last_committed_rounds
2063        );
2064        // Unscored subdags will be recovered based on the flushed commits and no commit info.
2065        assert_eq!(dag_state.scoring_subdags_count(), NUM_PERSISTED_COMMITS);
2066
2067        // Ensure that cached blocks exist only for specific rounds per authority
2068        for (authority_index, _) in context.committee.authorities() {
2069            let blocks = dag_state.get_cached_blocks(authority_index, 1);
2070
2071            // Ensure that eviction rounds have been properly recovered.
2072            // For every authority, the gc round is 9 - 3 = 6, and cached round is 12-5 = 7.
2073            // So eviction round is the min which is 6.
2074            if authority_index == AuthorityIndex::new_for_test(0) {
2075                assert_eq!(blocks.len(), 4);
2076                assert_eq!(dag_state.evicted_rounds[authority_index.value()], 6);
2077                assert!(
2078                    blocks
2079                        .into_iter()
2080                        .all(|block| block.round() >= 7 && block.round() <= 12)
2081                );
2082            } else {
2083                assert_eq!(blocks.len(), 6);
2084                assert_eq!(dag_state.evicted_rounds[authority_index.value()], 6);
2085                assert!(
2086                    blocks
2087                        .into_iter()
2088                        .all(|block| block.round() >= 7 && block.round() <= 12)
2089                );
2090            }
2091        }
2092
2093        // Ensure that committed blocks from > gc_round have been correctly recovered as committed according to committed sub dags.
2094        let gc_round = dag_state.gc_round();
2095        assert_eq!(gc_round, 6);
2096        dag_state
2097            .recent_blocks
2098            .iter()
2099            .for_each(|(block_ref, block_info)| {
2100                if block_ref.round > gc_round && finalized_blocks.contains(block_ref) {
2101                    assert!(
2102                        block_info.committed,
2103                        "Block {:?} should be set as committed",
2104                        block_ref
2105                    );
2106                }
2107            });
2108
2109        // Ensure the hard linked status of blocks are recovered.
2110        // All blocks below highest accepted round, or authority 0 round 12 block, should be hard linked.
2111        // Other blocks (round 12 but not from authority 0) should not be hard linked.
2112        // This is because authority 0 blocks are considered proposed blocks.
2113        dag_state
2114            .recent_blocks
2115            .iter()
2116            .for_each(|(block_ref, block_info)| {
2117                if block_ref.round < PERSISTED_BLOCK_ROUNDS || block_ref.author.value() == 0 {
2118                    assert!(block_info.included);
2119                } else {
2120                    assert!(!block_info.included);
2121                }
2122            });
2123    }
2124
2125    #[tokio::test]
2126    async fn test_block_info_as_committed() {
2127        let num_authorities: u32 = 4;
2128        let (context, _) = Context::new_for_test(num_authorities as usize);
2129        let context = Arc::new(context);
2130
2131        let store = Arc::new(MemStore::new());
2132        let mut dag_state = DagState::new(context.clone(), store.clone());
2133
2134        // Accept a block
2135        let block = VerifiedBlock::new_for_test(
2136            TestBlock::new(1, 0)
2137                .set_timestamp_ms(1000)
2138                .set_ancestors(vec![])
2139                .build(),
2140        );
2141
2142        dag_state.accept_block(block.clone());
2143
2144        // Query is committed
2145        assert!(!dag_state.is_committed(&block.reference()));
2146
2147        // Set block as committed for first time should return true
2148        assert!(
2149            dag_state.set_committed(&block.reference()),
2150            "Block should be successfully set as committed for first time"
2151        );
2152
2153        // Now it should appear as committed
2154        assert!(dag_state.is_committed(&block.reference()));
2155
2156        // Trying to set the block as committed again, it should return false.
2157        assert!(
2158            !dag_state.set_committed(&block.reference()),
2159            "Block should not be successfully set as committed"
2160        );
2161    }
2162
2163    #[tokio::test]
2164    async fn test_get_cached_blocks() {
2165        let (mut context, _) = Context::new_for_test(4);
2166        context.parameters.dag_state_cached_rounds = 5;
2167
2168        let context = Arc::new(context);
2169        let store = Arc::new(MemStore::new());
2170        let mut dag_state = DagState::new(context.clone(), store.clone());
2171
2172        // Create no blocks for authority 0
2173        // Create one block (round 10) for authority 1
2174        // Create two blocks (rounds 10,11) for authority 2
2175        // Create three blocks (rounds 10,11,12) for authority 3
2176        let mut all_blocks = Vec::new();
2177        for author in 1..=3 {
2178            for round in 10..(10 + author) {
2179                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2180                all_blocks.push(block.clone());
2181                dag_state.accept_block(block);
2182            }
2183        }
2184
2185        // Test get_cached_blocks()
2186
2187        let cached_blocks =
2188            dag_state.get_cached_blocks(context.committee.to_authority_index(0).unwrap(), 0);
2189        assert!(cached_blocks.is_empty());
2190
2191        let cached_blocks =
2192            dag_state.get_cached_blocks(context.committee.to_authority_index(1).unwrap(), 10);
2193        assert_eq!(cached_blocks.len(), 1);
2194        assert_eq!(cached_blocks[0].round(), 10);
2195
2196        let cached_blocks =
2197            dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 10);
2198        assert_eq!(cached_blocks.len(), 2);
2199        assert_eq!(cached_blocks[0].round(), 10);
2200        assert_eq!(cached_blocks[1].round(), 11);
2201
2202        let cached_blocks =
2203            dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 11);
2204        assert_eq!(cached_blocks.len(), 1);
2205        assert_eq!(cached_blocks[0].round(), 11);
2206
2207        let cached_blocks =
2208            dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 10);
2209        assert_eq!(cached_blocks.len(), 3);
2210        assert_eq!(cached_blocks[0].round(), 10);
2211        assert_eq!(cached_blocks[1].round(), 11);
2212        assert_eq!(cached_blocks[2].round(), 12);
2213
2214        let cached_blocks =
2215            dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 12);
2216        assert_eq!(cached_blocks.len(), 1);
2217        assert_eq!(cached_blocks[0].round(), 12);
2218
2219        // Test get_cached_blocks_in_range()
2220
2221        // Start == end
2222        let cached_blocks = dag_state.get_cached_blocks_in_range(
2223            context.committee.to_authority_index(3).unwrap(),
2224            10,
2225            10,
2226            1,
2227        );
2228        assert!(cached_blocks.is_empty());
2229
2230        // Start > end
2231        let cached_blocks = dag_state.get_cached_blocks_in_range(
2232            context.committee.to_authority_index(3).unwrap(),
2233            11,
2234            10,
2235            1,
2236        );
2237        assert!(cached_blocks.is_empty());
2238
2239        // Empty result.
2240        let cached_blocks = dag_state.get_cached_blocks_in_range(
2241            context.committee.to_authority_index(0).unwrap(),
2242            9,
2243            10,
2244            1,
2245        );
2246        assert!(cached_blocks.is_empty());
2247
2248        // Single block, one round before the end.
2249        let cached_blocks = dag_state.get_cached_blocks_in_range(
2250            context.committee.to_authority_index(1).unwrap(),
2251            9,
2252            11,
2253            1,
2254        );
2255        assert_eq!(cached_blocks.len(), 1);
2256        assert_eq!(cached_blocks[0].round(), 10);
2257
2258        // Respect end round.
2259        let cached_blocks = dag_state.get_cached_blocks_in_range(
2260            context.committee.to_authority_index(2).unwrap(),
2261            9,
2262            12,
2263            5,
2264        );
2265        assert_eq!(cached_blocks.len(), 2);
2266        assert_eq!(cached_blocks[0].round(), 10);
2267        assert_eq!(cached_blocks[1].round(), 11);
2268
2269        // Respect start round.
2270        let cached_blocks = dag_state.get_cached_blocks_in_range(
2271            context.committee.to_authority_index(3).unwrap(),
2272            11,
2273            20,
2274            5,
2275        );
2276        assert_eq!(cached_blocks.len(), 2);
2277        assert_eq!(cached_blocks[0].round(), 11);
2278        assert_eq!(cached_blocks[1].round(), 12);
2279
2280        // Respect limit
2281        let cached_blocks = dag_state.get_cached_blocks_in_range(
2282            context.committee.to_authority_index(3).unwrap(),
2283            10,
2284            20,
2285            1,
2286        );
2287        assert_eq!(cached_blocks.len(), 1);
2288        assert_eq!(cached_blocks[0].round(), 10);
2289    }
2290
2291    #[tokio::test]
2292    async fn test_get_last_cached_block() {
2293        // GIVEN
2294        const CACHED_ROUNDS: Round = 2;
2295        const GC_DEPTH: u32 = 1;
2296        let (mut context, _) = Context::new_for_test(4);
2297        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2298        context
2299            .protocol_config
2300            .set_consensus_gc_depth_for_testing(GC_DEPTH);
2301
2302        let context = Arc::new(context);
2303        let store = Arc::new(MemStore::new());
2304        let mut dag_state = DagState::new(context.clone(), store.clone());
2305
2306        // Create no blocks for authority 0
2307        // Create one block (round 1) for authority 1
2308        // Create two blocks (rounds 1,2) for authority 2
2309        // Create three blocks (rounds 1,2,3) for authority 3
2310        let dag_str = "DAG {
2311            Round 0 : { 4 },
2312            Round 1 : {
2313                B -> [*],
2314                C -> [*],
2315                D -> [*],
2316            },
2317            Round 2 : {
2318                C -> [*],
2319                D -> [*],
2320            },
2321            Round 3 : {
2322                D -> [*],
2323            },
2324        }";
2325
2326        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2327
2328        // Add equivocating block for round 2 authority 3
2329        let block = VerifiedBlock::new_for_test(TestBlock::new(2, 2).build());
2330
2331        // Accept all blocks
2332        for block in dag_builder
2333            .all_blocks()
2334            .into_iter()
2335            .chain(std::iter::once(block))
2336        {
2337            dag_state.accept_block(block);
2338        }
2339
2340        dag_state.add_commit(TrustedCommit::new_for_test(
2341            1 as CommitIndex,
2342            CommitDigest::MIN,
2343            context.clock.timestamp_utc_ms(),
2344            dag_builder.leader_block(3).unwrap().reference(),
2345            vec![],
2346        ));
2347
2348        // WHEN search for the latest blocks
2349        let end_round = 4;
2350        let expected_rounds = vec![0, 1, 2, 3];
2351        let expected_excluded_and_equivocating_blocks = vec![0, 0, 1, 0];
2352        // THEN
2353        let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2354        assert_eq!(
2355            last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2356            expected_rounds
2357        );
2358        assert_eq!(
2359            last_blocks.iter().map(|b| b.1.len()).collect::<Vec<_>>(),
2360            expected_excluded_and_equivocating_blocks
2361        );
2362
2363        // THEN
2364        for (i, expected_round) in expected_rounds.iter().enumerate() {
2365            let round = dag_state
2366                .get_last_cached_block_in_range(
2367                    context.committee.to_authority_index(i).unwrap(),
2368                    0,
2369                    end_round,
2370                )
2371                .map(|b| b.round())
2372                .unwrap_or_default();
2373            assert_eq!(round, *expected_round, "Authority {i}");
2374        }
2375
2376        // WHEN starting from round 2
2377        let start_round = 2;
2378        let expected_rounds = [0, 0, 2, 3];
2379
2380        // THEN
2381        for (i, expected_round) in expected_rounds.iter().enumerate() {
2382            let round = dag_state
2383                .get_last_cached_block_in_range(
2384                    context.committee.to_authority_index(i).unwrap(),
2385                    start_round,
2386                    end_round,
2387                )
2388                .map(|b| b.round())
2389                .unwrap_or_default();
2390            assert_eq!(round, *expected_round, "Authority {i}");
2391        }
2392
2393        // WHEN we flush the DagState - after adding a commit with all the blocks, we expect this to trigger
2394        // a clean up in the internal cache. That will keep the all the blocks with rounds >= authority_commit_round - CACHED_ROUND.
2395        //
2396        // When GC is enabled then we'll keep all the blocks that are > gc_round (2) and for those who don't have blocks > gc_round, we'll keep
2397        // all their highest round blocks for CACHED_ROUNDS.
2398        dag_state.flush();
2399
2400        // AND we request before round 3
2401        let end_round = 3;
2402        let expected_rounds = vec![0, 1, 2, 2];
2403
2404        // THEN
2405        let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2406        assert_eq!(
2407            last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2408            expected_rounds
2409        );
2410
2411        // THEN
2412        for (i, expected_round) in expected_rounds.iter().enumerate() {
2413            let round = dag_state
2414                .get_last_cached_block_in_range(
2415                    context.committee.to_authority_index(i).unwrap(),
2416                    0,
2417                    end_round,
2418                )
2419                .map(|b| b.round())
2420                .unwrap_or_default();
2421            assert_eq!(round, *expected_round, "Authority {i}");
2422        }
2423    }
2424
2425    #[tokio::test]
2426    #[should_panic(
2427        expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
2428    )]
2429    async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() {
2430        // GIVEN
2431        const CACHED_ROUNDS: Round = 1;
2432        const GC_DEPTH: u32 = 1;
2433        let (mut context, _) = Context::new_for_test(4);
2434        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2435        context
2436            .protocol_config
2437            .set_consensus_gc_depth_for_testing(GC_DEPTH);
2438
2439        let context = Arc::new(context);
2440        let store = Arc::new(MemStore::new());
2441        let mut dag_state = DagState::new(context.clone(), store.clone());
2442
2443        // Create no blocks for authority 0
2444        // Create one block (round 1) for authority 1
2445        // Create two blocks (rounds 1,2) for authority 2
2446        // Create three blocks (rounds 1,2,3) for authority 3
2447        let mut dag_builder = DagBuilder::new(context.clone());
2448        dag_builder
2449            .layers(1..=1)
2450            .authorities(vec![AuthorityIndex::new_for_test(0)])
2451            .skip_block()
2452            .build();
2453        dag_builder
2454            .layers(2..=2)
2455            .authorities(vec![
2456                AuthorityIndex::new_for_test(0),
2457                AuthorityIndex::new_for_test(1),
2458            ])
2459            .skip_block()
2460            .build();
2461        dag_builder
2462            .layers(3..=3)
2463            .authorities(vec![
2464                AuthorityIndex::new_for_test(0),
2465                AuthorityIndex::new_for_test(1),
2466                AuthorityIndex::new_for_test(2),
2467            ])
2468            .skip_block()
2469            .build();
2470
2471        // Accept all blocks
2472        for block in dag_builder.all_blocks() {
2473            dag_state.accept_block(block);
2474        }
2475
2476        dag_state.add_commit(TrustedCommit::new_for_test(
2477            1 as CommitIndex,
2478            CommitDigest::MIN,
2479            0,
2480            dag_builder.leader_block(3).unwrap().reference(),
2481            vec![],
2482        ));
2483
2484        // Flush the store so we update the evict rounds
2485        dag_state.flush();
2486
2487        // THEN the method should panic, as some authorities have already evicted rounds <= round 2
2488        dag_state.get_last_cached_block_per_authority(2);
2489    }
2490
2491    #[tokio::test]
2492    async fn test_last_quorum() {
2493        // GIVEN
2494        let (context, _) = Context::new_for_test(4);
2495        let context = Arc::new(context);
2496        let store = Arc::new(MemStore::new());
2497        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2498
2499        // WHEN no blocks exist then genesis should be returned
2500        {
2501            let genesis = genesis_blocks(context.as_ref());
2502
2503            assert_eq!(dag_state.read().last_quorum(), genesis);
2504        }
2505
2506        // WHEN a fully connected DAG up to round 4 is created, then round 4 blocks should be returned as quorum
2507        {
2508            let mut dag_builder = DagBuilder::new(context.clone());
2509            dag_builder
2510                .layers(1..=4)
2511                .build()
2512                .persist_layers(dag_state.clone());
2513            let round_4_blocks: Vec<_> = dag_builder
2514                .blocks(4..=4)
2515                .into_iter()
2516                .map(|block| block.reference())
2517                .collect();
2518
2519            let last_quorum = dag_state.read().last_quorum();
2520
2521            assert_eq!(
2522                last_quorum
2523                    .into_iter()
2524                    .map(|block| block.reference())
2525                    .collect::<Vec<_>>(),
2526                round_4_blocks
2527            );
2528        }
2529
2530        // WHEN adding one more block at round 5, still round 4 should be returned as quorum
2531        {
2532            let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2533            dag_state.write().accept_block(block);
2534
2535            let round_4_blocks = dag_state.read().get_uncommitted_blocks_at_round(4);
2536
2537            let last_quorum = dag_state.read().last_quorum();
2538
2539            assert_eq!(last_quorum, round_4_blocks);
2540        }
2541    }
2542
2543    #[tokio::test]
2544    async fn test_last_block_for_authority() {
2545        // GIVEN
2546        let (context, _) = Context::new_for_test(4);
2547        let context = Arc::new(context);
2548        let store = Arc::new(MemStore::new());
2549        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2550
2551        // WHEN no blocks exist then genesis should be returned
2552        {
2553            let genesis = genesis_blocks(context.as_ref());
2554            let my_genesis = genesis
2555                .into_iter()
2556                .find(|block| block.author() == context.own_index)
2557                .unwrap();
2558
2559            assert_eq!(dag_state.read().get_last_proposed_block(), my_genesis);
2560        }
2561
2562        // WHEN adding some blocks for authorities, only the last ones should be returned
2563        {
2564            // add blocks up to round 4
2565            let mut dag_builder = DagBuilder::new(context.clone());
2566            dag_builder
2567                .layers(1..=4)
2568                .build()
2569                .persist_layers(dag_state.clone());
2570
2571            // add block 5 for authority 0
2572            let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2573            dag_state.write().accept_block(block);
2574
2575            let block = dag_state
2576                .read()
2577                .get_last_block_for_authority(AuthorityIndex::new_for_test(0));
2578            assert_eq!(block.round(), 5);
2579
2580            for (authority_index, _) in context.committee.authorities() {
2581                let block = dag_state
2582                    .read()
2583                    .get_last_block_for_authority(authority_index);
2584
2585                if authority_index.value() == 0 {
2586                    assert_eq!(block.round(), 5);
2587                } else {
2588                    assert_eq!(block.round(), 4);
2589                }
2590            }
2591        }
2592    }
2593
2594    #[tokio::test]
2595    async fn test_accept_block_not_panics_when_timestamp_is_ahead_and_median_timestamp() {
2596        // GIVEN
2597        let (context, _) = Context::new_for_test(4);
2598        let context = Arc::new(context);
2599        let store = Arc::new(MemStore::new());
2600        let mut dag_state = DagState::new(context.clone(), store.clone());
2601
2602        // Set a timestamp for the block that is ahead of the current time
2603        let block_timestamp = context.clock.timestamp_utc_ms() + 5_000;
2604
2605        let block = VerifiedBlock::new_for_test(
2606            TestBlock::new(10, 0)
2607                .set_timestamp_ms(block_timestamp)
2608                .build(),
2609        );
2610
2611        // Try to accept the block - it should not panic
2612        dag_state.accept_block(block);
2613    }
2614
2615    #[tokio::test]
2616    async fn test_last_finalized_commit() {
2617        // GIVEN
2618        let (context, _) = Context::new_for_test(4);
2619        let context = Arc::new(context);
2620        let store = Arc::new(MemStore::new());
2621        let mut dag_state = DagState::new(context.clone(), store.clone());
2622
2623        // WHEN adding a finalized commit
2624        let commit_ref = CommitRef::new(1, CommitDigest::MIN);
2625        let rejected_transactions = BTreeMap::new();
2626        dag_state.add_finalized_commit(commit_ref, rejected_transactions.clone());
2627
2628        // THEN the commit should be added to the buffer
2629        assert_eq!(dag_state.finalized_commits_to_write.len(), 1);
2630        assert_eq!(
2631            dag_state.finalized_commits_to_write[0],
2632            (commit_ref, rejected_transactions.clone())
2633        );
2634
2635        // WHEN flushing the DAG state
2636        dag_state.flush();
2637
2638        // THEN the commit and rejected transactions should be written to storage
2639        let last_finalized_commit = store.read_last_finalized_commit().unwrap();
2640        assert_eq!(last_finalized_commit, Some(commit_ref));
2641        let stored_rejected_transactions = store
2642            .read_rejected_transactions(commit_ref)
2643            .unwrap()
2644            .unwrap();
2645        assert_eq!(stored_rejected_transactions, rejected_transactions);
2646    }
2647}