consensus_core/
dag_state.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    cmp::max,
6    collections::{BTreeMap, BTreeSet, VecDeque},
7    ops::Bound::{Excluded, Included, Unbounded},
8    panic,
9    sync::Arc,
10    time::Duration,
11    vec,
12};
13
14use consensus_config::AuthorityIndex;
15use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs, Round, TransactionIndex};
16use itertools::Itertools as _;
17use tokio::time::Instant;
18use tracing::{debug, error, info, trace};
19
20use crate::{
21    CommittedSubDag,
22    block::{BlockAPI, GENESIS_ROUND, Slot, VerifiedBlock, genesis_blocks},
23    commit::{
24        CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRef, CommitVote,
25        GENESIS_COMMIT_INDEX, TrustedCommit, load_committed_subdag_from_store,
26    },
27    context::Context,
28    leader_scoring::{ReputationScores, ScoringSubdag},
29    storage::{Store, WriteBatch},
30    threshold_clock::ThresholdClock,
31};
32
33/// DagState provides the API to write and read accepted blocks from the DAG.
34/// Only uncommitted and last committed blocks are cached in memory.
35/// The rest of blocks are stored on disk.
36/// Refs to cached blocks and additional refs are cached as well, to speed up existence checks.
37///
38/// Note: DagState should be wrapped with Arc<parking_lot::RwLock<_>>, to allow
39/// concurrent access from multiple components.
40pub struct DagState {
41    context: Arc<Context>,
42
43    // The genesis blocks
44    genesis: BTreeMap<BlockRef, VerifiedBlock>,
45
46    // Contains recent blocks within CACHED_ROUNDS from the last committed round per authority.
47    // Note: all uncommitted blocks are kept in memory.
48    //
49    // When GC is enabled, this map has a different semantic. It holds all the recent data for each authority making sure that it always have available
50    // CACHED_ROUNDS worth of data. The entries are evicted based on the latest GC round, however the eviction process will respect the CACHED_ROUNDS.
51    // For each authority, blocks are only evicted when their round is less than or equal to both `gc_round`, and `highest authority round - cached rounds`.
52    // This ensures that the GC requirements are respected (we never clean up any block above `gc_round`), and there are enough blocks cached.
53    recent_blocks: BTreeMap<BlockRef, BlockInfo>,
54
55    // Indexes recent block refs by their authorities.
56    // Vec position corresponds to the authority index.
57    recent_refs_by_authority: Vec<BTreeSet<BlockRef>>,
58
59    // Keeps track of the threshold clock for proposing blocks.
60    threshold_clock: ThresholdClock,
61
62    // Keeps track of the highest round that has been evicted for each authority. Any blocks that are of round <= evict_round
63    // should be considered evicted, and if any exist we should not consider the causauly complete in the order they appear.
64    // The `evicted_rounds` size should be the same as the committee size.
65    evicted_rounds: Vec<Round>,
66
67    // Highest round of blocks accepted.
68    highest_accepted_round: Round,
69
70    // Last consensus commit of the dag.
71    last_commit: Option<TrustedCommit>,
72
73    // Last wall time when commit round advanced. Does not persist across restarts.
74    last_commit_round_advancement_time: Option<std::time::Instant>,
75
76    // Last committed rounds per authority.
77    last_committed_rounds: Vec<Round>,
78
79    /// The committed subdags that have been scored but scores have not been used
80    /// for leader schedule yet.
81    scoring_subdag: ScoringSubdag,
82
83    // Commit votes pending to be included in new blocks.
84    // TODO: limit to 1st commit per round with multi-leader.
85    // TODO: recover unproposed pending commit votes at startup.
86    pending_commit_votes: VecDeque<CommitVote>,
87
88    // Blocks and commits must be buffered for persistence before they can be
89    // inserted into the local DAG or sent to output.
90    blocks_to_write: Vec<VerifiedBlock>,
91    commits_to_write: Vec<TrustedCommit>,
92
93    // Buffers the reputation scores & last_committed_rounds to be flushed with the
94    // next dag state flush. Not writing eagerly is okay because we can recover reputation scores
95    // & last_committed_rounds from the commits as needed.
96    commit_info_to_write: Vec<(CommitRef, CommitInfo)>,
97
98    // Buffers finalized commits and their rejected transactions to be written to storage.
99    finalized_commits_to_write: Vec<(CommitRef, BTreeMap<BlockRef, Vec<TransactionIndex>>)>,
100
101    // Persistent storage for blocks, commits and other consensus data.
102    store: Arc<dyn Store>,
103
104    // The number of cached rounds
105    cached_rounds: Round,
106}
107
108impl DagState {
109    /// Initializes DagState from storage.
110    pub fn new(context: Arc<Context>, store: Arc<dyn Store>) -> Self {
111        let cached_rounds = context.parameters.dag_state_cached_rounds as Round;
112        let num_authorities = context.committee.size();
113
114        let genesis = genesis_blocks(context.as_ref())
115            .into_iter()
116            .map(|block| (block.reference(), block))
117            .collect();
118
119        let threshold_clock = ThresholdClock::new(1, context.clone());
120
121        let last_commit = store
122            .read_last_commit()
123            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
124
125        let commit_info = store
126            .read_last_commit_info()
127            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
128        let (mut last_committed_rounds, commit_recovery_start_index) =
129            if let Some((commit_ref, commit_info)) = commit_info {
130                tracing::info!("Recovering committed state from {commit_ref} {commit_info:?}");
131                (commit_info.committed_rounds, commit_ref.index + 1)
132            } else {
133                tracing::info!("Found no stored CommitInfo to recover from");
134                (vec![0; num_authorities], GENESIS_COMMIT_INDEX + 1)
135            };
136
137        let mut unscored_committed_subdags = Vec::new();
138        let mut scoring_subdag = ScoringSubdag::new(context.clone());
139
140        if let Some(last_commit) = last_commit.as_ref() {
141            store
142                .scan_commits((commit_recovery_start_index..=last_commit.index()).into())
143                .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
144                .iter()
145                .for_each(|commit| {
146                    for block_ref in commit.blocks() {
147                        last_committed_rounds[block_ref.author] =
148                            max(last_committed_rounds[block_ref.author], block_ref.round);
149                    }
150                    let committed_subdag = load_committed_subdag_from_store(
151                        &context,
152                        store.as_ref(),
153                        commit.clone(),
154                        vec![],
155                    );
156                    // We don't need to recover reputation scores for unscored_committed_subdags
157                    unscored_committed_subdags.push(committed_subdag);
158                });
159        }
160
161        tracing::info!(
162            "DagState was initialized with the following state: \
163            {last_commit:?}; {last_committed_rounds:?}; {} unscored committed subdags;",
164            unscored_committed_subdags.len()
165        );
166
167        scoring_subdag.add_subdags(std::mem::take(&mut unscored_committed_subdags));
168
169        let mut state = Self {
170            context: context.clone(),
171            genesis,
172            recent_blocks: BTreeMap::new(),
173            recent_refs_by_authority: vec![BTreeSet::new(); num_authorities],
174            threshold_clock,
175            highest_accepted_round: 0,
176            last_commit: last_commit.clone(),
177            last_commit_round_advancement_time: None,
178            last_committed_rounds: last_committed_rounds.clone(),
179            pending_commit_votes: VecDeque::new(),
180            blocks_to_write: vec![],
181            commits_to_write: vec![],
182            commit_info_to_write: vec![],
183            finalized_commits_to_write: vec![],
184            scoring_subdag,
185            store: store.clone(),
186            cached_rounds,
187            evicted_rounds: vec![0; num_authorities],
188        };
189
190        for (authority_index, _) in context.committee.authorities() {
191            let (blocks, eviction_round) = {
192                // Find the latest block for the authority to calculate the eviction round. Then we want to scan and load the blocks from the eviction round and onwards only.
193                // As reminder, the eviction round is taking into account the gc_round.
194                let last_block = state
195                    .store
196                    .scan_last_blocks_by_author(authority_index, 1, None)
197                    .expect("Database error");
198                let last_block_round = last_block
199                    .last()
200                    .map(|b| b.round())
201                    .unwrap_or(GENESIS_ROUND);
202
203                let eviction_round =
204                    Self::eviction_round(last_block_round, state.gc_round(), state.cached_rounds);
205                let blocks = state
206                    .store
207                    .scan_blocks_by_author(authority_index, eviction_round + 1)
208                    .expect("Database error");
209
210                (blocks, eviction_round)
211            };
212
213            state.evicted_rounds[authority_index] = eviction_round;
214
215            // Update the block metadata for the authority.
216            for block in &blocks {
217                state.update_block_metadata(block);
218            }
219
220            debug!(
221                "Recovered blocks {}: {:?}",
222                authority_index,
223                blocks
224                    .iter()
225                    .map(|b| b.reference())
226                    .collect::<Vec<BlockRef>>()
227            );
228        }
229
230        if let Some(last_commit) = last_commit {
231            let mut index = last_commit.index();
232            let gc_round = state.gc_round();
233            info!(
234                "Recovering block commit statuses from commit index {} and backwards until leader of round <= gc_round {:?}",
235                index, gc_round
236            );
237
238            loop {
239                let commits = store
240                    .scan_commits((index..=index).into())
241                    .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
242                let Some(commit) = commits.first() else {
243                    info!("Recovering finished up to index {index}, no more commits to recover");
244                    break;
245                };
246
247                // Check the commit leader round to see if it is within the gc_round. If it is not then we can stop the recovery process.
248                if gc_round > 0 && commit.leader().round <= gc_round {
249                    info!(
250                        "Recovering finished, reached commit leader round {} <= gc_round {}",
251                        commit.leader().round,
252                        gc_round
253                    );
254                    break;
255                }
256
257                commit.blocks().iter().filter(|b| b.round > gc_round).for_each(|block_ref|{
258                    debug!(
259                        "Setting block {:?} as committed based on commit {:?}",
260                        block_ref,
261                        commit.index()
262                    );
263                    assert!(state.set_committed(block_ref), "Attempted to set again a block {:?} as committed when recovering commit {:?}", block_ref, commit);
264                });
265
266                // All commits are indexed starting from 1, so one reach zero exit.
267                index = index.saturating_sub(1);
268                if index == 0 {
269                    break;
270                }
271            }
272        }
273
274        // Recover hard linked statuses for blocks within GC round.
275        let proposed_blocks = store
276            .scan_blocks_by_author(context.own_index, state.gc_round() + 1)
277            .expect("Database error");
278        for block in proposed_blocks {
279            state.link_causal_history(block.reference());
280        }
281
282        state
283    }
284
285    /// Accepts a block into DagState and keeps it in memory.
286    pub(crate) fn accept_block(&mut self, block: VerifiedBlock) {
287        assert_ne!(
288            block.round(),
289            0,
290            "Genesis block should not be accepted into DAG."
291        );
292
293        let block_ref = block.reference();
294        if self.contains_block(&block_ref) {
295            return;
296        }
297
298        let now = self.context.clock.timestamp_utc_ms();
299        if block.timestamp_ms() > now {
300            trace!(
301                "Block {:?} with timestamp {} is greater than local timestamp {}.",
302                block,
303                block.timestamp_ms(),
304                now,
305            );
306        }
307        let hostname = &self.context.committee.authority(block_ref.author).hostname;
308        self.context
309            .metrics
310            .node_metrics
311            .accepted_block_time_drift_ms
312            .with_label_values(&[hostname])
313            .inc_by(block.timestamp_ms().saturating_sub(now));
314
315        // TODO: Move this check to core
316        // Ensure we don't write multiple blocks per slot for our own index
317        if block_ref.author == self.context.own_index {
318            let existing_blocks = self.get_uncommitted_blocks_at_slot(block_ref.into());
319            if !self
320                .context
321                .parameters
322                .internal
323                .skip_equivocation_validation
324            {
325                assert!(
326                    existing_blocks.is_empty(),
327                    "Block Rejected! Attempted to add block {block:#?} to own slot where \
328                    block(s) {existing_blocks:#?} already exists."
329                );
330            }
331        }
332        self.update_block_metadata(&block);
333        self.blocks_to_write.push(block);
334        let source = if self.context.own_index == block_ref.author {
335            "own"
336        } else {
337            "others"
338        };
339        self.context
340            .metrics
341            .node_metrics
342            .accepted_blocks
343            .with_label_values(&[source])
344            .inc();
345    }
346
347    /// Updates internal metadata for a block.
348    fn update_block_metadata(&mut self, block: &VerifiedBlock) {
349        let block_ref = block.reference();
350        self.recent_blocks
351            .insert(block_ref, BlockInfo::new(block.clone()));
352        self.recent_refs_by_authority[block_ref.author].insert(block_ref);
353
354        if self.threshold_clock.add_block(block_ref) {
355            // Do not measure quorum delay when no local block is proposed in the round.
356            let last_proposed_block = self.get_last_proposed_block();
357            if last_proposed_block.round() == block_ref.round {
358                let quorum_delay_ms = self
359                    .context
360                    .clock
361                    .timestamp_utc_ms()
362                    .saturating_sub(self.get_last_proposed_block().timestamp_ms());
363                self.context
364                    .metrics
365                    .node_metrics
366                    .quorum_receive_latency
367                    .observe(Duration::from_millis(quorum_delay_ms).as_secs_f64());
368            }
369        }
370
371        self.highest_accepted_round = max(self.highest_accepted_round, block.round());
372        self.context
373            .metrics
374            .node_metrics
375            .highest_accepted_round
376            .set(self.highest_accepted_round as i64);
377
378        let highest_accepted_round_for_author = self.recent_refs_by_authority[block_ref.author]
379            .last()
380            .map(|block_ref| block_ref.round)
381            .expect("There should be by now at least one block ref");
382        let hostname = &self.context.committee.authority(block_ref.author).hostname;
383        self.context
384            .metrics
385            .node_metrics
386            .highest_accepted_authority_round
387            .with_label_values(&[hostname])
388            .set(highest_accepted_round_for_author as i64);
389    }
390
391    /// Accepts a blocks into DagState and keeps it in memory.
392    pub(crate) fn accept_blocks(&mut self, blocks: Vec<VerifiedBlock>) {
393        debug!(
394            "Accepting blocks: {}",
395            blocks.iter().map(|b| b.reference().to_string()).join(",")
396        );
397        for block in blocks {
398            self.accept_block(block);
399        }
400    }
401
402    /// Gets a block by checking cached recent blocks then storage.
403    /// Returns None when the block is not found.
404    pub(crate) fn get_block(&self, reference: &BlockRef) -> Option<VerifiedBlock> {
405        self.get_blocks(&[*reference])
406            .pop()
407            .expect("Exactly one element should be returned")
408    }
409
410    /// Gets blocks by checking genesis, cached recent blocks in memory, then storage.
411    /// An element is None when the corresponding block is not found.
412    pub(crate) fn get_blocks(&self, block_refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
413        let mut blocks = vec![None; block_refs.len()];
414        let mut missing = Vec::new();
415
416        for (index, block_ref) in block_refs.iter().enumerate() {
417            if block_ref.round == GENESIS_ROUND {
418                // Allow the caller to handle the invalid genesis ancestor error.
419                if let Some(block) = self.genesis.get(block_ref) {
420                    blocks[index] = Some(block.clone());
421                }
422                continue;
423            }
424            if let Some(block_info) = self.recent_blocks.get(block_ref) {
425                blocks[index] = Some(block_info.block.clone());
426                continue;
427            }
428            missing.push((index, block_ref));
429        }
430
431        if missing.is_empty() {
432            return blocks;
433        }
434
435        let missing_refs = missing
436            .iter()
437            .map(|(_, block_ref)| **block_ref)
438            .collect::<Vec<_>>();
439        let store_results = self
440            .store
441            .read_blocks(&missing_refs)
442            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
443        self.context
444            .metrics
445            .node_metrics
446            .dag_state_store_read_count
447            .with_label_values(&["get_blocks"])
448            .inc();
449
450        for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
451            blocks[index] = result;
452        }
453
454        blocks
455    }
456
457    /// Gets all uncommitted blocks in a slot.
458    /// Uncommitted blocks must exist in memory, so only in-memory blocks are checked.
459    pub(crate) fn get_uncommitted_blocks_at_slot(&self, slot: Slot) -> Vec<VerifiedBlock> {
460        // TODO: either panic below when the slot is at or below the last committed round,
461        // or support reading from storage while limiting storage reads to edge cases.
462
463        let mut blocks = vec![];
464        for (_block_ref, block_info) in self.recent_blocks.range((
465            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
466            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
467        )) {
468            blocks.push(block_info.block.clone())
469        }
470        blocks
471    }
472
473    /// Gets all uncommitted blocks in a round.
474    /// Uncommitted blocks must exist in memory, so only in-memory blocks are checked.
475    pub(crate) fn get_uncommitted_blocks_at_round(&self, round: Round) -> Vec<VerifiedBlock> {
476        if round <= self.last_commit_round() {
477            panic!("Round {} have committed blocks!", round);
478        }
479
480        let mut blocks = vec![];
481        for (_block_ref, block_info) in self.recent_blocks.range((
482            Included(BlockRef::new(round, AuthorityIndex::ZERO, BlockDigest::MIN)),
483            Excluded(BlockRef::new(
484                round + 1,
485                AuthorityIndex::ZERO,
486                BlockDigest::MIN,
487            )),
488        )) {
489            blocks.push(block_info.block.clone())
490        }
491        blocks
492    }
493
494    /// Gets all ancestors in the history of a block at a certain round.
495    pub(crate) fn ancestors_at_round(
496        &self,
497        later_block: &VerifiedBlock,
498        earlier_round: Round,
499    ) -> Vec<VerifiedBlock> {
500        // Iterate through ancestors of later_block in round descending order.
501        let mut linked: BTreeSet<BlockRef> = later_block.ancestors().iter().cloned().collect();
502        while !linked.is_empty() {
503            let round = linked.last().unwrap().round;
504            // Stop after finishing traversal for ancestors above earlier_round.
505            if round <= earlier_round {
506                break;
507            }
508            let block_ref = linked.pop_last().unwrap();
509            let Some(block) = self.get_block(&block_ref) else {
510                panic!("Block {:?} should exist in DAG!", block_ref);
511            };
512            linked.extend(block.ancestors().iter().cloned());
513        }
514        linked
515            .range((
516                Included(BlockRef::new(
517                    earlier_round,
518                    AuthorityIndex::ZERO,
519                    BlockDigest::MIN,
520                )),
521                Unbounded,
522            ))
523            .map(|r| {
524                self.get_block(r)
525                    .unwrap_or_else(|| panic!("Block {:?} should exist in DAG!", r))
526                    .clone()
527            })
528            .collect()
529    }
530
531    /// Gets the last proposed block from this authority.
532    /// If no block is proposed yet, returns the genesis block.
533    pub(crate) fn get_last_proposed_block(&self) -> VerifiedBlock {
534        self.get_last_block_for_authority(self.context.own_index)
535    }
536
537    /// Retrieves the last accepted block from the specified `authority`. If no block is found in cache
538    /// then the genesis block is returned as no other block has been received from that authority.
539    pub(crate) fn get_last_block_for_authority(&self, authority: AuthorityIndex) -> VerifiedBlock {
540        if let Some(last) = self.recent_refs_by_authority[authority].last() {
541            return self
542                .recent_blocks
543                .get(last)
544                .expect("Block should be found in recent blocks")
545                .block
546                .clone();
547        }
548
549        // if none exists, then fallback to genesis
550        let (_, genesis_block) = self
551            .genesis
552            .iter()
553            .find(|(block_ref, _)| block_ref.author == authority)
554            .expect("Genesis should be found for authority {authority_index}");
555        genesis_block.clone()
556    }
557
558    /// Returns cached recent blocks from the specified authority.
559    /// Blocks returned are limited to round >= `start`, and cached.
560    /// NOTE: caller should not assume returned blocks are always chained.
561    /// "Disconnected" blocks can be returned when there are byzantine blocks,
562    /// or a previously evicted block is accepted again.
563    pub(crate) fn get_cached_blocks(
564        &self,
565        authority: AuthorityIndex,
566        start: Round,
567    ) -> Vec<VerifiedBlock> {
568        self.get_cached_blocks_in_range(authority, start, Round::MAX, usize::MAX)
569    }
570
571    // Retrieves the cached block within the range [start_round, end_round) from a given authority,
572    // limited in total number of blocks.
573    pub(crate) fn get_cached_blocks_in_range(
574        &self,
575        authority: AuthorityIndex,
576        start_round: Round,
577        end_round: Round,
578        limit: usize,
579    ) -> Vec<VerifiedBlock> {
580        if start_round >= end_round || limit == 0 {
581            return vec![];
582        }
583
584        let mut blocks = vec![];
585        for block_ref in self.recent_refs_by_authority[authority].range((
586            Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
587            Excluded(BlockRef::new(
588                end_round,
589                AuthorityIndex::MIN,
590                BlockDigest::MIN,
591            )),
592        )) {
593            let block_info = self
594                .recent_blocks
595                .get(block_ref)
596                .expect("Block should exist in recent blocks");
597            blocks.push(block_info.block.clone());
598            if blocks.len() >= limit {
599                break;
600            }
601        }
602        blocks
603    }
604
605    // Retrieves the last cached block within the range [start_round, end_round) from a given authority.
606    pub(crate) fn get_last_cached_block_in_range(
607        &self,
608        authority: AuthorityIndex,
609        start_round: Round,
610        end_round: Round,
611    ) -> Option<VerifiedBlock> {
612        if start_round >= end_round {
613            return None;
614        }
615
616        let block_ref = self.recent_refs_by_authority[authority]
617            .range((
618                Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
619                Excluded(BlockRef::new(
620                    end_round,
621                    AuthorityIndex::MIN,
622                    BlockDigest::MIN,
623                )),
624            ))
625            .last()?;
626
627        self.recent_blocks
628            .get(block_ref)
629            .map(|block_info| block_info.block.clone())
630    }
631
632    /// Returns the last block proposed per authority with `evicted round < round < end_round`.
633    /// The method is guaranteed to return results only when the `end_round` is not earlier of the
634    /// available cached data for each authority (evicted round + 1), otherwise the method will panic.
635    /// It's the caller's responsibility to ensure that is not requesting for earlier rounds.
636    /// In case of equivocation for an authority's last slot, one block will be returned (the last in order)
637    /// and the other equivocating blocks will be returned.
638    pub(crate) fn get_last_cached_block_per_authority(
639        &self,
640        end_round: Round,
641    ) -> Vec<(VerifiedBlock, Vec<BlockRef>)> {
642        // Initialize with the genesis blocks as fallback
643        let mut blocks = self.genesis.values().cloned().collect::<Vec<_>>();
644        let mut equivocating_blocks = vec![vec![]; self.context.committee.size()];
645
646        if end_round == GENESIS_ROUND {
647            panic!(
648                "Attempted to retrieve blocks earlier than the genesis round which is not possible"
649            );
650        }
651
652        if end_round == GENESIS_ROUND + 1 {
653            return blocks.into_iter().map(|b| (b, vec![])).collect();
654        }
655
656        for (authority_index, block_refs) in self.recent_refs_by_authority.iter().enumerate() {
657            let authority_index = self
658                .context
659                .committee
660                .to_authority_index(authority_index)
661                .unwrap();
662
663            let last_evicted_round = self.evicted_rounds[authority_index];
664            if end_round.saturating_sub(1) <= last_evicted_round {
665                panic!(
666                    "Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}",
667                );
668            }
669
670            let block_ref_iter = block_refs
671                .range((
672                    Included(BlockRef::new(
673                        last_evicted_round + 1,
674                        authority_index,
675                        BlockDigest::MIN,
676                    )),
677                    Excluded(BlockRef::new(end_round, authority_index, BlockDigest::MIN)),
678                ))
679                .rev();
680
681            let mut last_round = 0;
682            for block_ref in block_ref_iter {
683                if last_round == 0 {
684                    last_round = block_ref.round;
685                    let block_info = self
686                        .recent_blocks
687                        .get(block_ref)
688                        .expect("Block should exist in recent blocks");
689                    blocks[authority_index] = block_info.block.clone();
690                    continue;
691                }
692                if block_ref.round < last_round {
693                    break;
694                }
695                equivocating_blocks[authority_index].push(*block_ref);
696            }
697        }
698
699        blocks.into_iter().zip(equivocating_blocks).collect()
700    }
701
702    /// Checks whether a block exists in the slot. The method checks only against the cached data.
703    /// If the user asks for a slot that is not within the cached data then a panic is thrown.
704    pub(crate) fn contains_cached_block_at_slot(&self, slot: Slot) -> bool {
705        // Always return true for genesis slots.
706        if slot.round == GENESIS_ROUND {
707            return true;
708        }
709
710        let eviction_round = self.evicted_rounds[slot.authority];
711        if slot.round <= eviction_round {
712            panic!(
713                "{}",
714                format!(
715                    "Attempted to check for slot {slot} that is <= the last evicted round {eviction_round}"
716                )
717            );
718        }
719
720        let mut result = self.recent_refs_by_authority[slot.authority].range((
721            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
722            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
723        ));
724        result.next().is_some()
725    }
726
727    /// Checks whether the required blocks are in cache, if exist, or otherwise will check in store. The method is not caching
728    /// back the results, so its expensive if keep asking for cache missing blocks.
729    pub(crate) fn contains_blocks(&self, block_refs: Vec<BlockRef>) -> Vec<bool> {
730        let mut exist = vec![false; block_refs.len()];
731        let mut missing = Vec::new();
732
733        for (index, block_ref) in block_refs.into_iter().enumerate() {
734            let recent_refs = &self.recent_refs_by_authority[block_ref.author];
735            if recent_refs.contains(&block_ref) || self.genesis.contains_key(&block_ref) {
736                exist[index] = true;
737            } else if recent_refs.is_empty() || recent_refs.last().unwrap().round < block_ref.round
738            {
739                // Optimization: recent_refs contain the most recent blocks known to this authority.
740                // If a block ref is not found there and has a higher round, it definitely is
741                // missing from this authority and there is no need to check disk.
742                exist[index] = false;
743            } else {
744                missing.push((index, block_ref));
745            }
746        }
747
748        if missing.is_empty() {
749            return exist;
750        }
751
752        let missing_refs = missing
753            .iter()
754            .map(|(_, block_ref)| *block_ref)
755            .collect::<Vec<_>>();
756        let store_results = self
757            .store
758            .contains_blocks(&missing_refs)
759            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
760        self.context
761            .metrics
762            .node_metrics
763            .dag_state_store_read_count
764            .with_label_values(&["contains_blocks"])
765            .inc();
766
767        for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
768            exist[index] = result;
769        }
770
771        exist
772    }
773
774    pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool {
775        let blocks = self.contains_blocks(vec![*block_ref]);
776        blocks.first().cloned().unwrap()
777    }
778
779    // Sets the block as committed in the cache. If the block is set as committed for first time, then true is returned, otherwise false is returned instead.
780    // Method will panic if the block is not found in the cache.
781    pub(crate) fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
782        if let Some(block_info) = self.recent_blocks.get_mut(block_ref) {
783            if !block_info.committed {
784                block_info.committed = true;
785                return true;
786            }
787            false
788        } else {
789            panic!(
790                "Block {:?} not found in cache to set as committed.",
791                block_ref
792            );
793        }
794    }
795
796    /// Returns true if the block is committed. Only valid for blocks above the GC round.
797    pub(crate) fn is_committed(&self, block_ref: &BlockRef) -> bool {
798        self.recent_blocks
799            .get(block_ref)
800            .unwrap_or_else(|| panic!("Attempted to query for commit status for a block not in cached data {block_ref}"))
801            .committed
802    }
803
804    /// Recursively sets blocks in the causal history of the root block as hard linked, including the root block itself.
805    /// Returns the list of blocks that are newly linked.
806    /// The returned blocks are guaranteed to be above the GC round.
807    /// Transaction votes for the returned blocks are retrieved and carried by the upcoming
808    /// proposed block.
809    pub(crate) fn link_causal_history(&mut self, root_block: BlockRef) -> Vec<BlockRef> {
810        let gc_round = self.gc_round();
811        let mut linked_blocks = vec![];
812        let mut targets = VecDeque::new();
813        targets.push_back(root_block);
814        while let Some(block_ref) = targets.pop_front() {
815            // No need to collect or mark blocks at or below GC round.
816            // These blocks and their causal history will not be included in new commits.
817            // And their transactions do not need votes to finalize or skip.
818            //
819            // CommitFinalizer::gced_transaction_votes_for_pending_block() is the counterpart
820            // to this logic, when deciding if block A in the causal history of block B gets
821            // implicit accept transaction votes from block B.
822            if block_ref.round <= gc_round {
823                continue;
824            }
825            let block_info = self
826                .recent_blocks
827                .get_mut(&block_ref)
828                .unwrap_or_else(|| panic!("Block {:?} is not in DAG state", block_ref));
829            if block_info.included {
830                continue;
831            }
832            linked_blocks.push(block_ref);
833            block_info.included = true;
834            targets.extend(block_info.block.ancestors().iter());
835        }
836        linked_blocks
837    }
838
839    /// Returns true if the block has been included in an owned proposed block.
840    /// NOTE: caller should make sure only blocks above GC round are queried.
841    pub(crate) fn has_been_included(&self, block_ref: &BlockRef) -> bool {
842        self.recent_blocks
843            .get(block_ref)
844            .unwrap_or_else(|| {
845                panic!(
846                    "Attempted to query for inclusion status for a block not in cached data {}",
847                    block_ref
848                )
849            })
850            .included
851    }
852
853    pub(crate) fn threshold_clock_round(&self) -> Round {
854        self.threshold_clock.get_round()
855    }
856
857    // The timestamp of when quorum threshold was last reached in the threshold clock.
858    pub(crate) fn threshold_clock_quorum_ts(&self) -> Instant {
859        self.threshold_clock.get_quorum_ts()
860    }
861
862    pub(crate) fn highest_accepted_round(&self) -> Round {
863        self.highest_accepted_round
864    }
865
866    // Buffers a new commit in memory and updates last committed rounds.
867    // REQUIRED: must not skip over any commit index.
868    pub(crate) fn add_commit(&mut self, commit: TrustedCommit) {
869        let time_diff = if let Some(last_commit) = &self.last_commit {
870            if commit.index() <= last_commit.index() {
871                error!(
872                    "New commit index {} <= last commit index {}!",
873                    commit.index(),
874                    last_commit.index()
875                );
876                return;
877            }
878            assert_eq!(commit.index(), last_commit.index() + 1);
879
880            if commit.timestamp_ms() < last_commit.timestamp_ms() {
881                panic!(
882                    "Commit timestamps do not monotonically increment, prev commit {:?}, new commit {:?}",
883                    last_commit, commit
884                );
885            }
886            commit
887                .timestamp_ms()
888                .saturating_sub(last_commit.timestamp_ms())
889        } else {
890            assert_eq!(commit.index(), 1);
891            0
892        };
893
894        self.context
895            .metrics
896            .node_metrics
897            .last_commit_time_diff
898            .observe(time_diff as f64);
899
900        let commit_round_advanced = if let Some(previous_commit) = &self.last_commit {
901            previous_commit.round() < commit.round()
902        } else {
903            true
904        };
905
906        self.last_commit = Some(commit.clone());
907
908        if commit_round_advanced {
909            let now = std::time::Instant::now();
910            if let Some(previous_time) = self.last_commit_round_advancement_time {
911                self.context
912                    .metrics
913                    .node_metrics
914                    .commit_round_advancement_interval
915                    .observe(now.duration_since(previous_time).as_secs_f64())
916            }
917            self.last_commit_round_advancement_time = Some(now);
918        }
919
920        for block_ref in commit.blocks().iter() {
921            self.last_committed_rounds[block_ref.author] = max(
922                self.last_committed_rounds[block_ref.author],
923                block_ref.round,
924            );
925        }
926
927        for (i, round) in self.last_committed_rounds.iter().enumerate() {
928            let index = self.context.committee.to_authority_index(i).unwrap();
929            let hostname = &self.context.committee.authority(index).hostname;
930            self.context
931                .metrics
932                .node_metrics
933                .last_committed_authority_round
934                .with_label_values(&[hostname])
935                .set((*round).into());
936        }
937
938        self.pending_commit_votes.push_back(commit.reference());
939        self.commits_to_write.push(commit);
940    }
941
942    /// Recovers commits to write from storage, at startup.
943    pub(crate) fn recover_commits_to_write(&mut self, commits: Vec<TrustedCommit>) {
944        self.commits_to_write.extend(commits);
945    }
946
947    pub(crate) fn ensure_commits_to_write_is_empty(&self) {
948        assert!(
949            self.commits_to_write.is_empty(),
950            "Commits to write should be empty. {:?}",
951            self.commits_to_write,
952        );
953    }
954
955    pub(crate) fn add_commit_info(&mut self, reputation_scores: ReputationScores) {
956        // We create an empty scoring subdag once reputation scores are calculated.
957        // Note: It is okay for this to not be gated by protocol config as the
958        // scoring_subdag should be empty in either case at this point.
959        assert!(self.scoring_subdag.is_empty());
960
961        let commit_info = CommitInfo {
962            committed_rounds: self.last_committed_rounds.clone(),
963            reputation_scores,
964        };
965        let last_commit = self
966            .last_commit
967            .as_ref()
968            .expect("Last commit should already be set.");
969        self.commit_info_to_write
970            .push((last_commit.reference(), commit_info));
971    }
972
973    pub(crate) fn add_finalized_commit(
974        &mut self,
975        commit_ref: CommitRef,
976        rejected_transactions: BTreeMap<BlockRef, Vec<TransactionIndex>>,
977    ) {
978        self.finalized_commits_to_write
979            .push((commit_ref, rejected_transactions));
980    }
981
982    pub(crate) fn take_commit_votes(&mut self, limit: usize) -> Vec<CommitVote> {
983        let mut votes = Vec::new();
984        while !self.pending_commit_votes.is_empty() && votes.len() < limit {
985            votes.push(self.pending_commit_votes.pop_front().unwrap());
986        }
987        votes
988    }
989
990    /// Index of the last commit.
991    pub(crate) fn last_commit_index(&self) -> CommitIndex {
992        match &self.last_commit {
993            Some(commit) => commit.index(),
994            None => 0,
995        }
996    }
997
998    /// Digest of the last commit.
999    pub(crate) fn last_commit_digest(&self) -> CommitDigest {
1000        match &self.last_commit {
1001            Some(commit) => commit.digest(),
1002            None => CommitDigest::MIN,
1003        }
1004    }
1005
1006    /// Timestamp of the last commit.
1007    pub(crate) fn last_commit_timestamp_ms(&self) -> BlockTimestampMs {
1008        match &self.last_commit {
1009            Some(commit) => commit.timestamp_ms(),
1010            None => 0,
1011        }
1012    }
1013
1014    /// Leader slot of the last commit.
1015    pub(crate) fn last_commit_leader(&self) -> Slot {
1016        match &self.last_commit {
1017            Some(commit) => commit.leader().into(),
1018            None => self
1019                .genesis
1020                .iter()
1021                .next()
1022                .map(|(genesis_ref, _)| *genesis_ref)
1023                .expect("Genesis blocks should always be available.")
1024                .into(),
1025        }
1026    }
1027
1028    /// Highest round where a block is committed, which is last commit's leader round.
1029    pub(crate) fn last_commit_round(&self) -> Round {
1030        match &self.last_commit {
1031            Some(commit) => commit.leader().round,
1032            None => 0,
1033        }
1034    }
1035
1036    /// Last committed round per authority.
1037    pub(crate) fn last_committed_rounds(&self) -> Vec<Round> {
1038        self.last_committed_rounds.clone()
1039    }
1040
1041    /// The GC round is the highest round that blocks of equal or lower round are considered obsolete and no longer possible to be committed.
1042    /// There is no meaning accepting any blocks with round <= gc_round. The Garbage Collection (GC) round is calculated based on the latest
1043    /// committed leader round. When GC is disabled that will return the genesis round.
1044    pub(crate) fn gc_round(&self) -> Round {
1045        self.calculate_gc_round(self.last_commit_round())
1046    }
1047
1048    /// Calculates the GC round from the input leader round, which can be different
1049    /// from the last committed leader round.
1050    pub(crate) fn calculate_gc_round(&self, commit_round: Round) -> Round {
1051        commit_round.saturating_sub(self.context.protocol_config.gc_depth())
1052    }
1053
1054    /// Flushes unpersisted blocks, commits and commit info to storage.
1055    ///
1056    /// REQUIRED: when buffering a block, all of its ancestors and the latest commit which sets the GC round
1057    /// must also be buffered.
1058    /// REQUIRED: when buffering a commit, all of its included blocks and the previous commits must also be buffered.
1059    /// REQUIRED: when flushing, all of the buffered blocks and commits must be flushed together to ensure consistency.
1060    ///
1061    /// After each flush, DagState becomes persisted in storage and it expected to recover
1062    /// all internal states from storage after restarts.
1063    pub(crate) fn flush(&mut self) {
1064        let _s = self
1065            .context
1066            .metrics
1067            .node_metrics
1068            .scope_processing_time
1069            .with_label_values(&["DagState::flush"])
1070            .start_timer();
1071
1072        // Flush buffered data to storage.
1073        let pending_blocks = std::mem::take(&mut self.blocks_to_write);
1074        let pending_commits = std::mem::take(&mut self.commits_to_write);
1075        let pending_commit_info = std::mem::take(&mut self.commit_info_to_write);
1076        let pending_finalized_commits = std::mem::take(&mut self.finalized_commits_to_write);
1077        if pending_blocks.is_empty()
1078            && pending_commits.is_empty()
1079            && pending_commit_info.is_empty()
1080            && pending_finalized_commits.is_empty()
1081        {
1082            return;
1083        }
1084
1085        debug!(
1086            "Flushing {} blocks ({}), {} commits ({}), {} commit infos ({}), {} finalized commits ({}) to storage.",
1087            pending_blocks.len(),
1088            pending_blocks
1089                .iter()
1090                .map(|b| b.reference().to_string())
1091                .join(","),
1092            pending_commits.len(),
1093            pending_commits
1094                .iter()
1095                .map(|c| c.reference().to_string())
1096                .join(","),
1097            pending_commit_info.len(),
1098            pending_commit_info
1099                .iter()
1100                .map(|(commit_ref, _)| commit_ref.to_string())
1101                .join(","),
1102            pending_finalized_commits.len(),
1103            pending_finalized_commits
1104                .iter()
1105                .map(|(commit_ref, _)| commit_ref.to_string())
1106                .join(","),
1107        );
1108        self.store
1109            .write(WriteBatch::new(
1110                pending_blocks,
1111                pending_commits,
1112                pending_commit_info,
1113                pending_finalized_commits,
1114            ))
1115            .unwrap_or_else(|e| panic!("Failed to write to storage: {:?}", e));
1116        self.context
1117            .metrics
1118            .node_metrics
1119            .dag_state_store_write_count
1120            .inc();
1121
1122        // Clean up old cached data. After flushing, all cached blocks are guaranteed to be persisted.
1123        for (authority_index, _) in self.context.committee.authorities() {
1124            let eviction_round = self.calculate_authority_eviction_round(authority_index);
1125            while let Some(block_ref) = self.recent_refs_by_authority[authority_index].first() {
1126                if block_ref.round <= eviction_round {
1127                    self.recent_blocks.remove(block_ref);
1128                    self.recent_refs_by_authority[authority_index].pop_first();
1129                } else {
1130                    break;
1131                }
1132            }
1133            self.evicted_rounds[authority_index] = eviction_round;
1134        }
1135
1136        let metrics = &self.context.metrics.node_metrics;
1137        metrics
1138            .dag_state_recent_blocks
1139            .set(self.recent_blocks.len() as i64);
1140        metrics.dag_state_recent_refs.set(
1141            self.recent_refs_by_authority
1142                .iter()
1143                .map(BTreeSet::len)
1144                .sum::<usize>() as i64,
1145        );
1146    }
1147
1148    pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> {
1149        self.store
1150            .read_last_commit_info()
1151            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
1152    }
1153
1154    pub(crate) fn add_scoring_subdags(&mut self, scoring_subdags: Vec<CommittedSubDag>) {
1155        self.scoring_subdag.add_subdags(scoring_subdags);
1156    }
1157
1158    pub(crate) fn clear_scoring_subdag(&mut self) {
1159        self.scoring_subdag.clear();
1160    }
1161
1162    pub(crate) fn scoring_subdags_count(&self) -> usize {
1163        self.scoring_subdag.scored_subdags_count()
1164    }
1165
1166    pub(crate) fn is_scoring_subdag_empty(&self) -> bool {
1167        self.scoring_subdag.is_empty()
1168    }
1169
1170    pub(crate) fn calculate_scoring_subdag_scores(&self) -> ReputationScores {
1171        self.scoring_subdag.calculate_distributed_vote_scores()
1172    }
1173
1174    pub(crate) fn scoring_subdag_commit_range(&self) -> CommitIndex {
1175        self.scoring_subdag
1176            .commit_range
1177            .as_ref()
1178            .expect("commit range should exist for scoring subdag")
1179            .end()
1180    }
1181
1182    /// The last round that should get evicted after a cache clean up operation. After this round we are
1183    /// guaranteed to have all the produced blocks from that authority. For any round that is
1184    /// <= `last_evicted_round` we don't have such guarantees as out of order blocks might exist.
1185    fn calculate_authority_eviction_round(&self, authority_index: AuthorityIndex) -> Round {
1186        let last_round = self.recent_refs_by_authority[authority_index]
1187            .last()
1188            .map(|block_ref| block_ref.round)
1189            .unwrap_or(GENESIS_ROUND);
1190
1191        Self::eviction_round(last_round, self.gc_round(), self.cached_rounds)
1192    }
1193
1194    /// Calculates the eviction round for the given authority. The goal is to keep at least `cached_rounds`
1195    /// of the latest blocks in the cache (if enough data is available), while evicting blocks with rounds <= `gc_round` when possible.
1196    fn eviction_round(last_round: Round, gc_round: Round, cached_rounds: u32) -> Round {
1197        gc_round.min(last_round.saturating_sub(cached_rounds))
1198    }
1199
1200    /// Returns the underlying store.
1201    pub(crate) fn store(&self) -> Arc<dyn Store> {
1202        self.store.clone()
1203    }
1204
1205    /// Detects and returns the blocks of the round that forms the last quorum. The method will return
1206    /// the quorum even if that's genesis.
1207    #[cfg(test)]
1208    pub(crate) fn last_quorum(&self) -> Vec<VerifiedBlock> {
1209        // the quorum should exist either on the highest accepted round or the one before. If we fail to detect
1210        // a quorum then it means that our DAG has advanced with missing causal history.
1211        for round in
1212            (self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev()
1213        {
1214            if round == GENESIS_ROUND {
1215                return self.genesis_blocks();
1216            }
1217            use crate::stake_aggregator::{QuorumThreshold, StakeAggregator};
1218            let mut quorum = StakeAggregator::<QuorumThreshold>::new();
1219
1220            // Since the minimum wave length is 3 we expect to find a quorum in the uncommitted rounds.
1221            let blocks = self.get_uncommitted_blocks_at_round(round);
1222            for block in &blocks {
1223                if quorum.add(block.author(), &self.context.committee) {
1224                    return blocks;
1225                }
1226            }
1227        }
1228
1229        panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds.");
1230    }
1231
1232    #[cfg(test)]
1233    pub(crate) fn genesis_blocks(&self) -> Vec<VerifiedBlock> {
1234        self.genesis.values().cloned().collect()
1235    }
1236
1237    #[cfg(test)]
1238    pub(crate) fn set_last_commit(&mut self, commit: TrustedCommit) {
1239        self.last_commit = Some(commit);
1240    }
1241}
1242
1243struct BlockInfo {
1244    block: VerifiedBlock,
1245    // Whether the block has been committed
1246    committed: bool,
1247    // Whether the block has been included in the causal history of an owned proposed block.
1248    ///
1249    /// There are two usages of this field:
1250    /// 1. When proposing blocks, determine the set of blocks to carry votes for.
1251    /// 2. When recovering, determine if a block has not been included in a proposed block and
1252    ///    should recover transaction votes by voting.
1253    included: bool,
1254}
1255
1256impl BlockInfo {
1257    fn new(block: VerifiedBlock) -> Self {
1258        Self {
1259            block,
1260            committed: false,
1261            included: false,
1262        }
1263    }
1264}
1265
1266#[cfg(test)]
1267mod test {
1268    use std::vec;
1269
1270    use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs};
1271    use parking_lot::RwLock;
1272
1273    use super::*;
1274    use crate::{
1275        block::{TestBlock, VerifiedBlock},
1276        storage::{WriteBatch, mem_store::MemStore},
1277        test_dag_builder::DagBuilder,
1278        test_dag_parser::parse_dag,
1279    };
1280
1281    #[tokio::test]
1282    async fn test_get_blocks() {
1283        let (context, _) = Context::new_for_test(4);
1284        let context = Arc::new(context);
1285        let store = Arc::new(MemStore::new());
1286        let mut dag_state = DagState::new(context.clone(), store.clone());
1287        let own_index = AuthorityIndex::new_for_test(0);
1288
1289        // Populate test blocks for round 1 ~ 10, authorities 0 ~ 2.
1290        let num_rounds: u32 = 10;
1291        let non_existent_round: u32 = 100;
1292        let num_authorities: u32 = 3;
1293        let num_blocks_per_slot: usize = 3;
1294        let mut blocks = BTreeMap::new();
1295        for round in 1..=num_rounds {
1296            for author in 0..num_authorities {
1297                // Create 3 blocks per slot, with different timestamps and digests.
1298                let base_ts = round as BlockTimestampMs * 1000;
1299                for timestamp in base_ts..base_ts + num_blocks_per_slot as u64 {
1300                    let block = VerifiedBlock::new_for_test(
1301                        TestBlock::new(round, author)
1302                            .set_timestamp_ms(timestamp)
1303                            .build(),
1304                    );
1305                    dag_state.accept_block(block.clone());
1306                    blocks.insert(block.reference(), block);
1307
1308                    // Only write one block per slot for own index
1309                    if AuthorityIndex::new_for_test(author) == own_index {
1310                        break;
1311                    }
1312                }
1313            }
1314        }
1315
1316        // Check uncommitted blocks that exist.
1317        for (r, block) in &blocks {
1318            assert_eq!(&dag_state.get_block(r).unwrap(), block);
1319        }
1320
1321        // Check uncommitted blocks that do not exist.
1322        let last_ref = blocks.keys().last().unwrap();
1323        assert!(
1324            dag_state
1325                .get_block(&BlockRef::new(
1326                    last_ref.round,
1327                    last_ref.author,
1328                    BlockDigest::MIN
1329                ))
1330                .is_none()
1331        );
1332
1333        // Check slots with uncommitted blocks.
1334        for round in 1..=num_rounds {
1335            for author in 0..num_authorities {
1336                let slot = Slot::new(
1337                    round,
1338                    context
1339                        .committee
1340                        .to_authority_index(author as usize)
1341                        .unwrap(),
1342                );
1343                let blocks = dag_state.get_uncommitted_blocks_at_slot(slot);
1344
1345                // We only write one block per slot for own index
1346                if AuthorityIndex::new_for_test(author) == own_index {
1347                    assert_eq!(blocks.len(), 1);
1348                } else {
1349                    assert_eq!(blocks.len(), num_blocks_per_slot);
1350                }
1351
1352                for b in blocks {
1353                    assert_eq!(b.round(), round);
1354                    assert_eq!(
1355                        b.author(),
1356                        context
1357                            .committee
1358                            .to_authority_index(author as usize)
1359                            .unwrap()
1360                    );
1361                }
1362            }
1363        }
1364
1365        // Check slots without uncommitted blocks.
1366        let slot = Slot::new(non_existent_round, AuthorityIndex::ZERO);
1367        assert!(dag_state.get_uncommitted_blocks_at_slot(slot).is_empty());
1368
1369        // Check rounds with uncommitted blocks.
1370        for round in 1..=num_rounds {
1371            let blocks = dag_state.get_uncommitted_blocks_at_round(round);
1372            // Expect 3 blocks per authority except for own authority which should
1373            // have 1 block.
1374            assert_eq!(
1375                blocks.len(),
1376                (num_authorities - 1) as usize * num_blocks_per_slot + 1
1377            );
1378            for b in blocks {
1379                assert_eq!(b.round(), round);
1380            }
1381        }
1382
1383        // Check rounds without uncommitted blocks.
1384        assert!(
1385            dag_state
1386                .get_uncommitted_blocks_at_round(non_existent_round)
1387                .is_empty()
1388        );
1389    }
1390
1391    #[tokio::test]
1392    async fn test_ancestors_at_uncommitted_round() {
1393        // Initialize DagState.
1394        let (context, _) = Context::new_for_test(4);
1395        let context = Arc::new(context);
1396        let store = Arc::new(MemStore::new());
1397        let mut dag_state = DagState::new(context.clone(), store.clone());
1398
1399        // Populate DagState.
1400
1401        // Round 10 refs will not have their blocks in DagState.
1402        let round_10_refs: Vec<_> = (0..4)
1403            .map(|a| {
1404                VerifiedBlock::new_for_test(TestBlock::new(10, a).set_timestamp_ms(1000).build())
1405                    .reference()
1406            })
1407            .collect();
1408
1409        // Round 11 blocks.
1410        let round_11 = [
1411            // This will connect to round 12.
1412            VerifiedBlock::new_for_test(
1413                TestBlock::new(11, 0)
1414                    .set_timestamp_ms(1100)
1415                    .set_ancestors(round_10_refs.clone())
1416                    .build(),
1417            ),
1418            // Slot(11, 1) has 3 blocks.
1419            // This will connect to round 12.
1420            VerifiedBlock::new_for_test(
1421                TestBlock::new(11, 1)
1422                    .set_timestamp_ms(1110)
1423                    .set_ancestors(round_10_refs.clone())
1424                    .build(),
1425            ),
1426            // This will connect to round 13.
1427            VerifiedBlock::new_for_test(
1428                TestBlock::new(11, 1)
1429                    .set_timestamp_ms(1111)
1430                    .set_ancestors(round_10_refs.clone())
1431                    .build(),
1432            ),
1433            // This will not connect to any block.
1434            VerifiedBlock::new_for_test(
1435                TestBlock::new(11, 1)
1436                    .set_timestamp_ms(1112)
1437                    .set_ancestors(round_10_refs.clone())
1438                    .build(),
1439            ),
1440            // This will not connect to any block.
1441            VerifiedBlock::new_for_test(
1442                TestBlock::new(11, 2)
1443                    .set_timestamp_ms(1120)
1444                    .set_ancestors(round_10_refs.clone())
1445                    .build(),
1446            ),
1447            // This will connect to round 12.
1448            VerifiedBlock::new_for_test(
1449                TestBlock::new(11, 3)
1450                    .set_timestamp_ms(1130)
1451                    .set_ancestors(round_10_refs.clone())
1452                    .build(),
1453            ),
1454        ];
1455
1456        // Round 12 blocks.
1457        let ancestors_for_round_12 = vec![
1458            round_11[0].reference(),
1459            round_11[1].reference(),
1460            round_11[5].reference(),
1461        ];
1462        let round_12 = [
1463            VerifiedBlock::new_for_test(
1464                TestBlock::new(12, 0)
1465                    .set_timestamp_ms(1200)
1466                    .set_ancestors(ancestors_for_round_12.clone())
1467                    .build(),
1468            ),
1469            VerifiedBlock::new_for_test(
1470                TestBlock::new(12, 2)
1471                    .set_timestamp_ms(1220)
1472                    .set_ancestors(ancestors_for_round_12.clone())
1473                    .build(),
1474            ),
1475            VerifiedBlock::new_for_test(
1476                TestBlock::new(12, 3)
1477                    .set_timestamp_ms(1230)
1478                    .set_ancestors(ancestors_for_round_12.clone())
1479                    .build(),
1480            ),
1481        ];
1482
1483        // Round 13 blocks.
1484        let ancestors_for_round_13 = vec![
1485            round_12[0].reference(),
1486            round_12[1].reference(),
1487            round_12[2].reference(),
1488            round_11[2].reference(),
1489        ];
1490        let round_13 = [
1491            VerifiedBlock::new_for_test(
1492                TestBlock::new(12, 1)
1493                    .set_timestamp_ms(1300)
1494                    .set_ancestors(ancestors_for_round_13.clone())
1495                    .build(),
1496            ),
1497            VerifiedBlock::new_for_test(
1498                TestBlock::new(12, 2)
1499                    .set_timestamp_ms(1320)
1500                    .set_ancestors(ancestors_for_round_13.clone())
1501                    .build(),
1502            ),
1503            VerifiedBlock::new_for_test(
1504                TestBlock::new(12, 3)
1505                    .set_timestamp_ms(1330)
1506                    .set_ancestors(ancestors_for_round_13.clone())
1507                    .build(),
1508            ),
1509        ];
1510
1511        // Round 14 anchor block.
1512        let ancestors_for_round_14 = round_13.iter().map(|b| b.reference()).collect();
1513        let anchor = VerifiedBlock::new_for_test(
1514            TestBlock::new(14, 1)
1515                .set_timestamp_ms(1410)
1516                .set_ancestors(ancestors_for_round_14)
1517                .build(),
1518        );
1519
1520        // Add all blocks (at and above round 11) to DagState.
1521        for b in round_11
1522            .iter()
1523            .chain(round_12.iter())
1524            .chain(round_13.iter())
1525            .chain([anchor.clone()].iter())
1526        {
1527            dag_state.accept_block(b.clone());
1528        }
1529
1530        // Check ancestors connected to anchor.
1531        let ancestors = dag_state.ancestors_at_round(&anchor, 11);
1532        let mut ancestors_refs: Vec<BlockRef> = ancestors.iter().map(|b| b.reference()).collect();
1533        ancestors_refs.sort();
1534        let mut expected_refs = vec![
1535            round_11[0].reference(),
1536            round_11[1].reference(),
1537            round_11[2].reference(),
1538            round_11[5].reference(),
1539        ];
1540        expected_refs.sort(); // we need to sort as blocks with same author and round of round 11 (position 1 & 2) might not be in right lexicographical order.
1541        assert_eq!(
1542            ancestors_refs, expected_refs,
1543            "Expected round 11 ancestors: {:?}. Got: {:?}",
1544            expected_refs, ancestors_refs
1545        );
1546    }
1547
1548    #[tokio::test]
1549    async fn test_link_causal_history() {
1550        let (mut context, _) = Context::new_for_test(4);
1551        context.parameters.dag_state_cached_rounds = 10;
1552        context.protocol_config.set_gc_depth_for_testing(3);
1553        let context = Arc::new(context);
1554
1555        let store = Arc::new(MemStore::new());
1556        let mut dag_state = DagState::new(context.clone(), store.clone());
1557
1558        // Create for rounds 1..=6. Skip creating blocks for authority 0 for rounds 4 - 6.
1559        let mut dag_builder = DagBuilder::new(context.clone());
1560        dag_builder.layers(1..=3).build();
1561        dag_builder
1562            .layers(4..=6)
1563            .authorities(vec![AuthorityIndex::new_for_test(0)])
1564            .skip_block()
1565            .build();
1566
1567        // Accept all blocks
1568        let all_blocks = dag_builder.all_blocks();
1569        dag_state.accept_blocks(all_blocks.clone());
1570
1571        // No block is linked yet.
1572        for block in &all_blocks {
1573            assert!(!dag_state.has_been_included(&block.reference()));
1574        }
1575
1576        // Link causal history from a round 1 block.
1577        let round_1_block = &all_blocks[1];
1578        assert_eq!(round_1_block.round(), 1);
1579        let linked_blocks = dag_state.link_causal_history(round_1_block.reference());
1580
1581        // Check that the block is linked.
1582        assert_eq!(linked_blocks.len(), 1);
1583        assert_eq!(linked_blocks[0], round_1_block.reference());
1584        for block_ref in linked_blocks {
1585            assert!(dag_state.has_been_included(&block_ref));
1586        }
1587
1588        // Link causal history from a round 2 block.
1589        let round_2_block = &all_blocks[4];
1590        assert_eq!(round_2_block.round(), 2);
1591        let linked_blocks = dag_state.link_causal_history(round_2_block.reference());
1592
1593        // Check the linked blocks.
1594        assert_eq!(linked_blocks.len(), 4);
1595        for block_ref in linked_blocks {
1596            assert!(block_ref == round_2_block.reference() || block_ref.round == 1);
1597        }
1598
1599        // Check linked status in dag state.
1600        for block in &all_blocks {
1601            if block.round() == 1 || block.reference() == round_2_block.reference() {
1602                assert!(dag_state.has_been_included(&block.reference()));
1603            } else {
1604                assert!(!dag_state.has_been_included(&block.reference()));
1605            }
1606        }
1607
1608        // Select round 6 block.
1609        let round_6_block = all_blocks.last().unwrap();
1610        assert_eq!(round_6_block.round(), 6);
1611
1612        // Get GC round to 3.
1613        let last_commit = TrustedCommit::new_for_test(
1614            6,
1615            CommitDigest::MIN,
1616            context.clock.timestamp_utc_ms(),
1617            round_6_block.reference(),
1618            vec![],
1619        );
1620        dag_state.set_last_commit(last_commit);
1621        assert_eq!(
1622            dag_state.gc_round(),
1623            3,
1624            "GC round should have moved to round 3"
1625        );
1626
1627        // Link causal history from a round 6 block.
1628        let linked_blocks = dag_state.link_causal_history(round_6_block.reference());
1629
1630        // Check the linked blocks. They should not include GC'ed blocks.
1631        assert_eq!(linked_blocks.len(), 7, "Linked blocks: {:?}", linked_blocks);
1632        for block_ref in linked_blocks {
1633            assert!(
1634                block_ref.round == 4
1635                    || block_ref.round == 5
1636                    || block_ref == round_6_block.reference()
1637            );
1638        }
1639
1640        // Check linked status in dag state.
1641        for block in &all_blocks {
1642            let block_ref = block.reference();
1643            if block.round() == 1
1644                || block_ref == round_2_block.reference()
1645                || block_ref.round == 4
1646                || block_ref.round == 5
1647                || block_ref == round_6_block.reference()
1648            {
1649                assert!(dag_state.has_been_included(&block.reference()));
1650            } else {
1651                assert!(!dag_state.has_been_included(&block.reference()));
1652            }
1653        }
1654    }
1655
1656    #[tokio::test]
1657    async fn test_contains_blocks_in_cache_or_store() {
1658        /// Only keep elements up to 2 rounds before the last committed round
1659        const CACHED_ROUNDS: Round = 2;
1660
1661        let (mut context, _) = Context::new_for_test(4);
1662        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1663
1664        let context = Arc::new(context);
1665        let store = Arc::new(MemStore::new());
1666        let mut dag_state = DagState::new(context.clone(), store.clone());
1667
1668        // Create test blocks for round 1 ~ 10
1669        let num_rounds: u32 = 10;
1670        let num_authorities: u32 = 4;
1671        let mut blocks = Vec::new();
1672
1673        for round in 1..=num_rounds {
1674            for author in 0..num_authorities {
1675                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1676                blocks.push(block);
1677            }
1678        }
1679
1680        // Now write in store the blocks from first 4 rounds and the rest to the dag state
1681        blocks.clone().into_iter().for_each(|block| {
1682            if block.round() <= 4 {
1683                store
1684                    .write(WriteBatch::default().blocks(vec![block]))
1685                    .unwrap();
1686            } else {
1687                dag_state.accept_blocks(vec![block]);
1688            }
1689        });
1690
1691        // Now when trying to query whether we have all the blocks, we should successfully retrieve a positive answer
1692        // where the blocks of first 4 round should be found in DagState and the rest in store.
1693        let mut block_refs = blocks
1694            .iter()
1695            .map(|block| block.reference())
1696            .collect::<Vec<_>>();
1697        let result = dag_state.contains_blocks(block_refs.clone());
1698
1699        // Ensure everything is found
1700        let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1701        assert_eq!(result, expected);
1702
1703        // Now try to ask also for one block ref that is neither in cache nor in store
1704        block_refs.insert(
1705            3,
1706            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1707        );
1708        let result = dag_state.contains_blocks(block_refs.clone());
1709
1710        // Then all should be found apart from the last one
1711        expected.insert(3, false);
1712        assert_eq!(result, expected.clone());
1713    }
1714
1715    #[tokio::test]
1716    async fn test_contains_cached_block_at_slot() {
1717        /// Only keep elements up to 2 rounds before the last committed round
1718        const CACHED_ROUNDS: Round = 2;
1719
1720        let num_authorities: u32 = 4;
1721        let (mut context, _) = Context::new_for_test(num_authorities as usize);
1722        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1723
1724        let context = Arc::new(context);
1725        let store = Arc::new(MemStore::new());
1726        let mut dag_state = DagState::new(context.clone(), store.clone());
1727
1728        // Create test blocks for round 1 ~ 10
1729        let num_rounds: u32 = 10;
1730        let mut blocks = Vec::new();
1731
1732        for round in 1..=num_rounds {
1733            for author in 0..num_authorities {
1734                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1735                blocks.push(block.clone());
1736                dag_state.accept_block(block);
1737            }
1738        }
1739
1740        // Query for genesis round 0, genesis blocks should be returned
1741        for (author, _) in context.committee.authorities() {
1742            assert!(
1743                dag_state.contains_cached_block_at_slot(Slot::new(GENESIS_ROUND, author)),
1744                "Genesis should always be found"
1745            );
1746        }
1747
1748        // Now when trying to query whether we have all the blocks, we should successfully retrieve a positive answer
1749        // where the blocks of first 4 round should be found in DagState and the rest in store.
1750        let mut block_refs = blocks
1751            .iter()
1752            .map(|block| block.reference())
1753            .collect::<Vec<_>>();
1754
1755        for block_ref in block_refs.clone() {
1756            let slot = block_ref.into();
1757            let found = dag_state.contains_cached_block_at_slot(slot);
1758            assert!(found, "A block should be found at slot {}", slot);
1759        }
1760
1761        // Now try to ask also for one block ref that is not in cache
1762        // Then all should be found apart from the last one
1763        block_refs.insert(
1764            3,
1765            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1766        );
1767        let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1768        expected.insert(3, false);
1769
1770        // Attempt to check the same for via the contains slot method
1771        for block_ref in block_refs {
1772            let slot = block_ref.into();
1773            let found = dag_state.contains_cached_block_at_slot(slot);
1774
1775            assert_eq!(expected.remove(0), found);
1776        }
1777    }
1778
1779    #[tokio::test]
1780    #[ignore]
1781    #[should_panic(
1782        expected = "Attempted to check for slot [1]3 that is <= the last gc evicted round 3"
1783    )]
1784    async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range() {
1785        /// Keep 2 rounds from the highest committed round. This is considered universal and minimum necessary blocks to hold
1786        /// for the correct node operation.
1787        const GC_DEPTH: u32 = 2;
1788        /// Keep at least 3 rounds in cache for each authority.
1789        const CACHED_ROUNDS: Round = 3;
1790
1791        let (mut context, _) = Context::new_for_test(4);
1792        context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
1793        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1794
1795        let context = Arc::new(context);
1796        let store = Arc::new(MemStore::new());
1797        let mut dag_state = DagState::new(context.clone(), store.clone());
1798
1799        // Create for rounds 1..=6. Skip creating blocks for authority 0 for rounds 4 - 6.
1800        let mut dag_builder = DagBuilder::new(context.clone());
1801        dag_builder.layers(1..=3).build();
1802        dag_builder
1803            .layers(4..=6)
1804            .authorities(vec![AuthorityIndex::new_for_test(0)])
1805            .skip_block()
1806            .build();
1807
1808        // Accept all blocks
1809        dag_builder
1810            .all_blocks()
1811            .into_iter()
1812            .for_each(|block| dag_state.accept_block(block));
1813
1814        // Now add a commit for leader round 5 to trigger an eviction
1815        dag_state.add_commit(TrustedCommit::new_for_test(
1816            1 as CommitIndex,
1817            CommitDigest::MIN,
1818            0,
1819            dag_builder.leader_block(5).unwrap().reference(),
1820            vec![],
1821        ));
1822        // Flush the DAG state to storage.
1823        dag_state.flush();
1824
1825        // Ensure that gc round has been updated
1826        assert_eq!(dag_state.gc_round(), 3, "GC round should be 3");
1827
1828        // Now what we expect to happen is for:
1829        // * Nodes 1 - 3 should have in cache blocks from gc_round (3) and onwards.
1830        // * Node 0 should have in cache blocks from it's latest round, 3, up to round 1, which is the number of cached_rounds.
1831        for authority_index in 1..=3 {
1832            for round in 4..=6 {
1833                assert!(dag_state.contains_cached_block_at_slot(Slot::new(
1834                    round,
1835                    AuthorityIndex::new_for_test(authority_index)
1836                )));
1837            }
1838        }
1839
1840        for round in 1..=3 {
1841            assert!(
1842                dag_state.contains_cached_block_at_slot(Slot::new(
1843                    round,
1844                    AuthorityIndex::new_for_test(0)
1845                ))
1846            );
1847        }
1848
1849        // When trying to request for authority 1 at block slot 3 it should panic, as anything
1850        // that is <= 3 should be evicted
1851        let _ =
1852            dag_state.contains_cached_block_at_slot(Slot::new(3, AuthorityIndex::new_for_test(1)));
1853    }
1854
1855    #[tokio::test]
1856    async fn test_get_blocks_in_cache_or_store() {
1857        let (context, _) = Context::new_for_test(4);
1858        let context = Arc::new(context);
1859        let store = Arc::new(MemStore::new());
1860        let mut dag_state = DagState::new(context.clone(), store.clone());
1861
1862        // Create test blocks for round 1 ~ 10
1863        let num_rounds: u32 = 10;
1864        let num_authorities: u32 = 4;
1865        let mut blocks = Vec::new();
1866
1867        for round in 1..=num_rounds {
1868            for author in 0..num_authorities {
1869                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1870                blocks.push(block);
1871            }
1872        }
1873
1874        // Now write in store the blocks from first 4 rounds and the rest to the dag state
1875        blocks.clone().into_iter().for_each(|block| {
1876            if block.round() <= 4 {
1877                store
1878                    .write(WriteBatch::default().blocks(vec![block]))
1879                    .unwrap();
1880            } else {
1881                dag_state.accept_blocks(vec![block]);
1882            }
1883        });
1884
1885        // Now when trying to query whether we have all the blocks, we should successfully retrieve a positive answer
1886        // where the blocks of first 4 round should be found in DagState and the rest in store.
1887        let mut block_refs = blocks
1888            .iter()
1889            .map(|block| block.reference())
1890            .collect::<Vec<_>>();
1891        let result = dag_state.get_blocks(&block_refs);
1892
1893        let mut expected = blocks
1894            .into_iter()
1895            .map(Some)
1896            .collect::<Vec<Option<VerifiedBlock>>>();
1897
1898        // Ensure everything is found
1899        assert_eq!(result, expected.clone());
1900
1901        // Now try to ask also for one block ref that is neither in cache nor in store
1902        block_refs.insert(
1903            3,
1904            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1905        );
1906        let result = dag_state.get_blocks(&block_refs);
1907
1908        // Then all should be found apart from the last one
1909        expected.insert(3, None);
1910        assert_eq!(result, expected);
1911    }
1912
1913    #[tokio::test]
1914    async fn test_flush_and_recovery() {
1915        telemetry_subscribers::init_for_testing();
1916
1917        const GC_DEPTH: u32 = 3;
1918        const CACHED_ROUNDS: u32 = 4;
1919
1920        let num_authorities: u32 = 4;
1921        let (mut context, _) = Context::new_for_test(num_authorities as usize);
1922        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1923        context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
1924
1925        let context = Arc::new(context);
1926
1927        let store = Arc::new(MemStore::new());
1928        let mut dag_state = DagState::new(context.clone(), store.clone());
1929
1930        const NUM_ROUNDS: Round = 20;
1931        let mut dag_builder = DagBuilder::new(context.clone());
1932        dag_builder.layers(1..=5).build();
1933        dag_builder
1934            .layers(6..=8)
1935            .authorities(vec![AuthorityIndex::new_for_test(0)])
1936            .skip_block()
1937            .build();
1938        dag_builder.layers(9..=NUM_ROUNDS).build();
1939
1940        // Get all commits from the DAG builder.
1941        const LAST_COMMIT_ROUND: Round = 16;
1942        const LAST_COMMIT_INDEX: CommitIndex = 15;
1943        let commits = dag_builder
1944            .get_sub_dag_and_commits(1..=NUM_ROUNDS)
1945            .into_iter()
1946            .map(|(_subdag, commit)| commit)
1947            .take(LAST_COMMIT_INDEX as usize)
1948            .collect::<Vec<_>>();
1949        assert_eq!(commits.len(), LAST_COMMIT_INDEX as usize);
1950        assert_eq!(commits.last().unwrap().round(), LAST_COMMIT_ROUND);
1951
1952        // Add the blocks from first 11 rounds and first 8 commits to the dag state
1953        // Note that the commit of round 8 is missing because where authority 0 is the leader but produced no block.
1954        // So commit 8 has leader round 9.
1955        const PERSISTED_BLOCK_ROUNDS: u32 = 12;
1956        const NUM_PERSISTED_COMMITS: usize = 8;
1957        const LAST_PERSISTED_COMMIT_ROUND: Round = 9;
1958        const LAST_PERSISTED_COMMIT_INDEX: CommitIndex = 8;
1959        dag_state.accept_blocks(dag_builder.blocks(1..=PERSISTED_BLOCK_ROUNDS));
1960        let mut finalized_commits = vec![];
1961        for commit in commits.iter().take(NUM_PERSISTED_COMMITS).cloned() {
1962            finalized_commits.push(commit.clone());
1963            dag_state.add_commit(commit);
1964        }
1965        let last_finalized_commit = finalized_commits.last().unwrap();
1966        assert_eq!(last_finalized_commit.round(), LAST_PERSISTED_COMMIT_ROUND);
1967        assert_eq!(last_finalized_commit.index(), LAST_PERSISTED_COMMIT_INDEX);
1968
1969        // Collect finalized blocks.
1970        let finalized_blocks = finalized_commits
1971            .iter()
1972            .flat_map(|commit| commit.blocks())
1973            .collect::<BTreeSet<_>>();
1974
1975        // Flush commits from the dag state
1976        dag_state.flush();
1977
1978        // Verify the store has blocks up to round 12, and commits up to index 8.
1979        let store_blocks = store
1980            .scan_blocks_by_author(AuthorityIndex::new_for_test(1), 1)
1981            .unwrap();
1982        assert_eq!(store_blocks.last().unwrap().round(), PERSISTED_BLOCK_ROUNDS);
1983        let store_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1984        assert_eq!(store_commits.len(), NUM_PERSISTED_COMMITS);
1985        assert_eq!(
1986            store_commits.last().unwrap().index(),
1987            LAST_PERSISTED_COMMIT_INDEX
1988        );
1989        assert_eq!(
1990            store_commits.last().unwrap().round(),
1991            LAST_PERSISTED_COMMIT_ROUND
1992        );
1993
1994        // Add the rest of the blocks and commits to the dag state
1995        dag_state.accept_blocks(dag_builder.blocks(PERSISTED_BLOCK_ROUNDS + 1..=NUM_ROUNDS));
1996        for commit in commits.iter().skip(NUM_PERSISTED_COMMITS).cloned() {
1997            dag_state.add_commit(commit);
1998        }
1999
2000        // All blocks should be found in DagState.
2001        let all_blocks = dag_builder.blocks(1..=NUM_ROUNDS);
2002        let block_refs = all_blocks
2003            .iter()
2004            .map(|block| block.reference())
2005            .collect::<Vec<_>>();
2006        let result = dag_state
2007            .get_blocks(&block_refs)
2008            .into_iter()
2009            .map(|b| b.unwrap())
2010            .collect::<Vec<_>>();
2011        assert_eq!(result, all_blocks);
2012
2013        // Last commit index from DagState should now be 15
2014        assert_eq!(dag_state.last_commit_index(), LAST_COMMIT_INDEX);
2015
2016        // Destroy the dag state without flushing additional data.
2017        drop(dag_state);
2018
2019        // Recover the state from the store
2020        let dag_state = DagState::new(context.clone(), store.clone());
2021
2022        // Persisted blocks rounds should be found in DagState.
2023        let all_blocks = dag_builder.blocks(1..=PERSISTED_BLOCK_ROUNDS);
2024        let block_refs = all_blocks
2025            .iter()
2026            .map(|block| block.reference())
2027            .collect::<Vec<_>>();
2028        let result = dag_state
2029            .get_blocks(&block_refs)
2030            .into_iter()
2031            .map(|b| b.unwrap())
2032            .collect::<Vec<_>>();
2033        assert_eq!(result, all_blocks);
2034
2035        // Unpersisted blocks should not be in DagState, because they are not flushed.
2036        let missing_blocks = dag_builder.blocks(PERSISTED_BLOCK_ROUNDS + 1..=NUM_ROUNDS);
2037        let block_refs = missing_blocks
2038            .iter()
2039            .map(|block| block.reference())
2040            .collect::<Vec<_>>();
2041        let retrieved_blocks = dag_state
2042            .get_blocks(&block_refs)
2043            .into_iter()
2044            .flatten()
2045            .collect::<Vec<_>>();
2046        assert!(retrieved_blocks.is_empty());
2047
2048        // Recovered last commit index and round should be 8 and 9.
2049        assert_eq!(dag_state.last_commit_index(), LAST_PERSISTED_COMMIT_INDEX);
2050        assert_eq!(dag_state.last_commit_round(), LAST_PERSISTED_COMMIT_ROUND);
2051
2052        // The last_commit_rounds of the finalized commits should have been recovered.
2053        let expected_last_committed_rounds = vec![5, 9, 8, 8];
2054        assert_eq!(
2055            dag_state.last_committed_rounds(),
2056            expected_last_committed_rounds
2057        );
2058        // Unscored subdags will be recovered based on the flushed commits and no commit info.
2059        assert_eq!(dag_state.scoring_subdags_count(), NUM_PERSISTED_COMMITS);
2060
2061        // Ensure that cached blocks exist only for specific rounds per authority
2062        for (authority_index, _) in context.committee.authorities() {
2063            let blocks = dag_state.get_cached_blocks(authority_index, 1);
2064
2065            // Ensure that eviction rounds have been properly recovered.
2066            // For every authority, the gc round is 9 - 3 = 6, and cached round is 12-5 = 7.
2067            // So eviction round is the min which is 6.
2068            if authority_index == AuthorityIndex::new_for_test(0) {
2069                assert_eq!(blocks.len(), 4);
2070                assert_eq!(dag_state.evicted_rounds[authority_index.value()], 6);
2071                assert!(
2072                    blocks
2073                        .into_iter()
2074                        .all(|block| block.round() >= 7 && block.round() <= 12)
2075                );
2076            } else {
2077                assert_eq!(blocks.len(), 6);
2078                assert_eq!(dag_state.evicted_rounds[authority_index.value()], 6);
2079                assert!(
2080                    blocks
2081                        .into_iter()
2082                        .all(|block| block.round() >= 7 && block.round() <= 12)
2083                );
2084            }
2085        }
2086
2087        // Ensure that committed blocks from > gc_round have been correctly recovered as committed according to committed sub dags.
2088        let gc_round = dag_state.gc_round();
2089        assert_eq!(gc_round, 6);
2090        dag_state
2091            .recent_blocks
2092            .iter()
2093            .for_each(|(block_ref, block_info)| {
2094                if block_ref.round > gc_round && finalized_blocks.contains(block_ref) {
2095                    assert!(
2096                        block_info.committed,
2097                        "Block {:?} should be set as committed",
2098                        block_ref
2099                    );
2100                }
2101            });
2102
2103        // Ensure the hard linked status of blocks are recovered.
2104        // All blocks below highest accepted round, or authority 0 round 12 block, should be hard linked.
2105        // Other blocks (round 12 but not from authority 0) should not be hard linked.
2106        // This is because authority 0 blocks are considered proposed blocks.
2107        dag_state
2108            .recent_blocks
2109            .iter()
2110            .for_each(|(block_ref, block_info)| {
2111                if block_ref.round < PERSISTED_BLOCK_ROUNDS || block_ref.author.value() == 0 {
2112                    assert!(block_info.included);
2113                } else {
2114                    assert!(!block_info.included);
2115                }
2116            });
2117    }
2118
2119    #[tokio::test]
2120    async fn test_block_info_as_committed() {
2121        let num_authorities: u32 = 4;
2122        let (context, _) = Context::new_for_test(num_authorities as usize);
2123        let context = Arc::new(context);
2124
2125        let store = Arc::new(MemStore::new());
2126        let mut dag_state = DagState::new(context.clone(), store.clone());
2127
2128        // Accept a block
2129        let block = VerifiedBlock::new_for_test(
2130            TestBlock::new(1, 0)
2131                .set_timestamp_ms(1000)
2132                .set_ancestors(vec![])
2133                .build(),
2134        );
2135
2136        dag_state.accept_block(block.clone());
2137
2138        // Query is committed
2139        assert!(!dag_state.is_committed(&block.reference()));
2140
2141        // Set block as committed for first time should return true
2142        assert!(
2143            dag_state.set_committed(&block.reference()),
2144            "Block should be successfully set as committed for first time"
2145        );
2146
2147        // Now it should appear as committed
2148        assert!(dag_state.is_committed(&block.reference()));
2149
2150        // Trying to set the block as committed again, it should return false.
2151        assert!(
2152            !dag_state.set_committed(&block.reference()),
2153            "Block should not be successfully set as committed"
2154        );
2155    }
2156
2157    #[tokio::test]
2158    async fn test_get_cached_blocks() {
2159        let (mut context, _) = Context::new_for_test(4);
2160        context.parameters.dag_state_cached_rounds = 5;
2161
2162        let context = Arc::new(context);
2163        let store = Arc::new(MemStore::new());
2164        let mut dag_state = DagState::new(context.clone(), store.clone());
2165
2166        // Create no blocks for authority 0
2167        // Create one block (round 10) for authority 1
2168        // Create two blocks (rounds 10,11) for authority 2
2169        // Create three blocks (rounds 10,11,12) for authority 3
2170        let mut all_blocks = Vec::new();
2171        for author in 1..=3 {
2172            for round in 10..(10 + author) {
2173                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2174                all_blocks.push(block.clone());
2175                dag_state.accept_block(block);
2176            }
2177        }
2178
2179        // Test get_cached_blocks()
2180
2181        let cached_blocks =
2182            dag_state.get_cached_blocks(context.committee.to_authority_index(0).unwrap(), 0);
2183        assert!(cached_blocks.is_empty());
2184
2185        let cached_blocks =
2186            dag_state.get_cached_blocks(context.committee.to_authority_index(1).unwrap(), 10);
2187        assert_eq!(cached_blocks.len(), 1);
2188        assert_eq!(cached_blocks[0].round(), 10);
2189
2190        let cached_blocks =
2191            dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 10);
2192        assert_eq!(cached_blocks.len(), 2);
2193        assert_eq!(cached_blocks[0].round(), 10);
2194        assert_eq!(cached_blocks[1].round(), 11);
2195
2196        let cached_blocks =
2197            dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 11);
2198        assert_eq!(cached_blocks.len(), 1);
2199        assert_eq!(cached_blocks[0].round(), 11);
2200
2201        let cached_blocks =
2202            dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 10);
2203        assert_eq!(cached_blocks.len(), 3);
2204        assert_eq!(cached_blocks[0].round(), 10);
2205        assert_eq!(cached_blocks[1].round(), 11);
2206        assert_eq!(cached_blocks[2].round(), 12);
2207
2208        let cached_blocks =
2209            dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 12);
2210        assert_eq!(cached_blocks.len(), 1);
2211        assert_eq!(cached_blocks[0].round(), 12);
2212
2213        // Test get_cached_blocks_in_range()
2214
2215        // Start == end
2216        let cached_blocks = dag_state.get_cached_blocks_in_range(
2217            context.committee.to_authority_index(3).unwrap(),
2218            10,
2219            10,
2220            1,
2221        );
2222        assert!(cached_blocks.is_empty());
2223
2224        // Start > end
2225        let cached_blocks = dag_state.get_cached_blocks_in_range(
2226            context.committee.to_authority_index(3).unwrap(),
2227            11,
2228            10,
2229            1,
2230        );
2231        assert!(cached_blocks.is_empty());
2232
2233        // Empty result.
2234        let cached_blocks = dag_state.get_cached_blocks_in_range(
2235            context.committee.to_authority_index(0).unwrap(),
2236            9,
2237            10,
2238            1,
2239        );
2240        assert!(cached_blocks.is_empty());
2241
2242        // Single block, one round before the end.
2243        let cached_blocks = dag_state.get_cached_blocks_in_range(
2244            context.committee.to_authority_index(1).unwrap(),
2245            9,
2246            11,
2247            1,
2248        );
2249        assert_eq!(cached_blocks.len(), 1);
2250        assert_eq!(cached_blocks[0].round(), 10);
2251
2252        // Respect end round.
2253        let cached_blocks = dag_state.get_cached_blocks_in_range(
2254            context.committee.to_authority_index(2).unwrap(),
2255            9,
2256            12,
2257            5,
2258        );
2259        assert_eq!(cached_blocks.len(), 2);
2260        assert_eq!(cached_blocks[0].round(), 10);
2261        assert_eq!(cached_blocks[1].round(), 11);
2262
2263        // Respect start round.
2264        let cached_blocks = dag_state.get_cached_blocks_in_range(
2265            context.committee.to_authority_index(3).unwrap(),
2266            11,
2267            20,
2268            5,
2269        );
2270        assert_eq!(cached_blocks.len(), 2);
2271        assert_eq!(cached_blocks[0].round(), 11);
2272        assert_eq!(cached_blocks[1].round(), 12);
2273
2274        // Respect limit
2275        let cached_blocks = dag_state.get_cached_blocks_in_range(
2276            context.committee.to_authority_index(3).unwrap(),
2277            10,
2278            20,
2279            1,
2280        );
2281        assert_eq!(cached_blocks.len(), 1);
2282        assert_eq!(cached_blocks[0].round(), 10);
2283    }
2284
2285    #[tokio::test]
2286    async fn test_get_last_cached_block() {
2287        // GIVEN
2288        const CACHED_ROUNDS: Round = 2;
2289        const GC_DEPTH: u32 = 1;
2290        let (mut context, _) = Context::new_for_test(4);
2291        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2292        context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
2293
2294        let context = Arc::new(context);
2295        let store = Arc::new(MemStore::new());
2296        let mut dag_state = DagState::new(context.clone(), store.clone());
2297
2298        // Create no blocks for authority 0
2299        // Create one block (round 1) for authority 1
2300        // Create two blocks (rounds 1,2) for authority 2
2301        // Create three blocks (rounds 1,2,3) for authority 3
2302        let dag_str = "DAG {
2303            Round 0 : { 4 },
2304            Round 1 : {
2305                B -> [*],
2306                C -> [*],
2307                D -> [*],
2308            },
2309            Round 2 : {
2310                C -> [*],
2311                D -> [*],
2312            },
2313            Round 3 : {
2314                D -> [*],
2315            },
2316        }";
2317
2318        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2319
2320        // Add equivocating block for round 2 authority 3
2321        let block = VerifiedBlock::new_for_test(TestBlock::new(2, 2).build());
2322
2323        // Accept all blocks
2324        for block in dag_builder
2325            .all_blocks()
2326            .into_iter()
2327            .chain(std::iter::once(block))
2328        {
2329            dag_state.accept_block(block);
2330        }
2331
2332        dag_state.add_commit(TrustedCommit::new_for_test(
2333            1 as CommitIndex,
2334            CommitDigest::MIN,
2335            context.clock.timestamp_utc_ms(),
2336            dag_builder.leader_block(3).unwrap().reference(),
2337            vec![],
2338        ));
2339
2340        // WHEN search for the latest blocks
2341        let end_round = 4;
2342        let expected_rounds = vec![0, 1, 2, 3];
2343        let expected_excluded_and_equivocating_blocks = vec![0, 0, 1, 0];
2344        // THEN
2345        let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2346        assert_eq!(
2347            last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2348            expected_rounds
2349        );
2350        assert_eq!(
2351            last_blocks.iter().map(|b| b.1.len()).collect::<Vec<_>>(),
2352            expected_excluded_and_equivocating_blocks
2353        );
2354
2355        // THEN
2356        for (i, expected_round) in expected_rounds.iter().enumerate() {
2357            let round = dag_state
2358                .get_last_cached_block_in_range(
2359                    context.committee.to_authority_index(i).unwrap(),
2360                    0,
2361                    end_round,
2362                )
2363                .map(|b| b.round())
2364                .unwrap_or_default();
2365            assert_eq!(round, *expected_round, "Authority {i}");
2366        }
2367
2368        // WHEN starting from round 2
2369        let start_round = 2;
2370        let expected_rounds = [0, 0, 2, 3];
2371
2372        // THEN
2373        for (i, expected_round) in expected_rounds.iter().enumerate() {
2374            let round = dag_state
2375                .get_last_cached_block_in_range(
2376                    context.committee.to_authority_index(i).unwrap(),
2377                    start_round,
2378                    end_round,
2379                )
2380                .map(|b| b.round())
2381                .unwrap_or_default();
2382            assert_eq!(round, *expected_round, "Authority {i}");
2383        }
2384
2385        // WHEN we flush the DagState - after adding a commit with all the blocks, we expect this to trigger
2386        // a clean up in the internal cache. That will keep the all the blocks with rounds >= authority_commit_round - CACHED_ROUND.
2387        //
2388        // When GC is enabled then we'll keep all the blocks that are > gc_round (2) and for those who don't have blocks > gc_round, we'll keep
2389        // all their highest round blocks for CACHED_ROUNDS.
2390        dag_state.flush();
2391
2392        // AND we request before round 3
2393        let end_round = 3;
2394        let expected_rounds = vec![0, 1, 2, 2];
2395
2396        // THEN
2397        let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2398        assert_eq!(
2399            last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2400            expected_rounds
2401        );
2402
2403        // THEN
2404        for (i, expected_round) in expected_rounds.iter().enumerate() {
2405            let round = dag_state
2406                .get_last_cached_block_in_range(
2407                    context.committee.to_authority_index(i).unwrap(),
2408                    0,
2409                    end_round,
2410                )
2411                .map(|b| b.round())
2412                .unwrap_or_default();
2413            assert_eq!(round, *expected_round, "Authority {i}");
2414        }
2415    }
2416
2417    #[tokio::test]
2418    #[should_panic(
2419        expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
2420    )]
2421    async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() {
2422        // GIVEN
2423        const CACHED_ROUNDS: Round = 1;
2424        const GC_DEPTH: u32 = 1;
2425        let (mut context, _) = Context::new_for_test(4);
2426        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2427        context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
2428
2429        let context = Arc::new(context);
2430        let store = Arc::new(MemStore::new());
2431        let mut dag_state = DagState::new(context.clone(), store.clone());
2432
2433        // Create no blocks for authority 0
2434        // Create one block (round 1) for authority 1
2435        // Create two blocks (rounds 1,2) for authority 2
2436        // Create three blocks (rounds 1,2,3) for authority 3
2437        let mut dag_builder = DagBuilder::new(context.clone());
2438        dag_builder
2439            .layers(1..=1)
2440            .authorities(vec![AuthorityIndex::new_for_test(0)])
2441            .skip_block()
2442            .build();
2443        dag_builder
2444            .layers(2..=2)
2445            .authorities(vec![
2446                AuthorityIndex::new_for_test(0),
2447                AuthorityIndex::new_for_test(1),
2448            ])
2449            .skip_block()
2450            .build();
2451        dag_builder
2452            .layers(3..=3)
2453            .authorities(vec![
2454                AuthorityIndex::new_for_test(0),
2455                AuthorityIndex::new_for_test(1),
2456                AuthorityIndex::new_for_test(2),
2457            ])
2458            .skip_block()
2459            .build();
2460
2461        // Accept all blocks
2462        for block in dag_builder.all_blocks() {
2463            dag_state.accept_block(block);
2464        }
2465
2466        dag_state.add_commit(TrustedCommit::new_for_test(
2467            1 as CommitIndex,
2468            CommitDigest::MIN,
2469            0,
2470            dag_builder.leader_block(3).unwrap().reference(),
2471            vec![],
2472        ));
2473
2474        // Flush the store so we update the evict rounds
2475        dag_state.flush();
2476
2477        // THEN the method should panic, as some authorities have already evicted rounds <= round 2
2478        dag_state.get_last_cached_block_per_authority(2);
2479    }
2480
2481    #[tokio::test]
2482    async fn test_last_quorum() {
2483        // GIVEN
2484        let (context, _) = Context::new_for_test(4);
2485        let context = Arc::new(context);
2486        let store = Arc::new(MemStore::new());
2487        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2488
2489        // WHEN no blocks exist then genesis should be returned
2490        {
2491            let genesis = genesis_blocks(context.as_ref());
2492
2493            assert_eq!(dag_state.read().last_quorum(), genesis);
2494        }
2495
2496        // WHEN a fully connected DAG up to round 4 is created, then round 4 blocks should be returned as quorum
2497        {
2498            let mut dag_builder = DagBuilder::new(context.clone());
2499            dag_builder
2500                .layers(1..=4)
2501                .build()
2502                .persist_layers(dag_state.clone());
2503            let round_4_blocks: Vec<_> = dag_builder
2504                .blocks(4..=4)
2505                .into_iter()
2506                .map(|block| block.reference())
2507                .collect();
2508
2509            let last_quorum = dag_state.read().last_quorum();
2510
2511            assert_eq!(
2512                last_quorum
2513                    .into_iter()
2514                    .map(|block| block.reference())
2515                    .collect::<Vec<_>>(),
2516                round_4_blocks
2517            );
2518        }
2519
2520        // WHEN adding one more block at round 5, still round 4 should be returned as quorum
2521        {
2522            let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2523            dag_state.write().accept_block(block);
2524
2525            let round_4_blocks = dag_state.read().get_uncommitted_blocks_at_round(4);
2526
2527            let last_quorum = dag_state.read().last_quorum();
2528
2529            assert_eq!(last_quorum, round_4_blocks);
2530        }
2531    }
2532
2533    #[tokio::test]
2534    async fn test_last_block_for_authority() {
2535        // GIVEN
2536        let (context, _) = Context::new_for_test(4);
2537        let context = Arc::new(context);
2538        let store = Arc::new(MemStore::new());
2539        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2540
2541        // WHEN no blocks exist then genesis should be returned
2542        {
2543            let genesis = genesis_blocks(context.as_ref());
2544            let my_genesis = genesis
2545                .into_iter()
2546                .find(|block| block.author() == context.own_index)
2547                .unwrap();
2548
2549            assert_eq!(dag_state.read().get_last_proposed_block(), my_genesis);
2550        }
2551
2552        // WHEN adding some blocks for authorities, only the last ones should be returned
2553        {
2554            // add blocks up to round 4
2555            let mut dag_builder = DagBuilder::new(context.clone());
2556            dag_builder
2557                .layers(1..=4)
2558                .build()
2559                .persist_layers(dag_state.clone());
2560
2561            // add block 5 for authority 0
2562            let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2563            dag_state.write().accept_block(block);
2564
2565            let block = dag_state
2566                .read()
2567                .get_last_block_for_authority(AuthorityIndex::new_for_test(0));
2568            assert_eq!(block.round(), 5);
2569
2570            for (authority_index, _) in context.committee.authorities() {
2571                let block = dag_state
2572                    .read()
2573                    .get_last_block_for_authority(authority_index);
2574
2575                if authority_index.value() == 0 {
2576                    assert_eq!(block.round(), 5);
2577                } else {
2578                    assert_eq!(block.round(), 4);
2579                }
2580            }
2581        }
2582    }
2583
2584    #[tokio::test]
2585    async fn test_accept_block_not_panics_when_timestamp_is_ahead_and_median_timestamp() {
2586        // GIVEN
2587        let (context, _) = Context::new_for_test(4);
2588        let context = Arc::new(context);
2589        let store = Arc::new(MemStore::new());
2590        let mut dag_state = DagState::new(context.clone(), store.clone());
2591
2592        // Set a timestamp for the block that is ahead of the current time
2593        let block_timestamp = context.clock.timestamp_utc_ms() + 5_000;
2594
2595        let block = VerifiedBlock::new_for_test(
2596            TestBlock::new(10, 0)
2597                .set_timestamp_ms(block_timestamp)
2598                .build(),
2599        );
2600
2601        // Try to accept the block - it should not panic
2602        dag_state.accept_block(block);
2603    }
2604
2605    #[tokio::test]
2606    async fn test_last_finalized_commit() {
2607        // GIVEN
2608        let (context, _) = Context::new_for_test(4);
2609        let context = Arc::new(context);
2610        let store = Arc::new(MemStore::new());
2611        let mut dag_state = DagState::new(context.clone(), store.clone());
2612
2613        // WHEN adding a finalized commit
2614        let commit_ref = CommitRef::new(1, CommitDigest::MIN);
2615        let rejected_transactions = BTreeMap::new();
2616        dag_state.add_finalized_commit(commit_ref, rejected_transactions.clone());
2617
2618        // THEN the commit should be added to the buffer
2619        assert_eq!(dag_state.finalized_commits_to_write.len(), 1);
2620        assert_eq!(
2621            dag_state.finalized_commits_to_write[0],
2622            (commit_ref, rejected_transactions.clone())
2623        );
2624
2625        // WHEN flushing the DAG state
2626        dag_state.flush();
2627
2628        // THEN the commit and rejected transactions should be written to storage
2629        let last_finalized_commit = store.read_last_finalized_commit().unwrap();
2630        assert_eq!(last_finalized_commit, Some(commit_ref));
2631        let stored_rejected_transactions = store
2632            .read_rejected_transactions(commit_ref)
2633            .unwrap()
2634            .unwrap();
2635        assert_eq!(stored_rejected_transactions, rejected_transactions);
2636    }
2637}