consensus_core/
dag_state.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    cmp::max,
6    collections::{BTreeMap, BTreeSet, VecDeque},
7    ops::Bound::{Excluded, Included, Unbounded},
8    panic,
9    sync::Arc,
10    time::Duration,
11    vec,
12};
13
14use consensus_config::AuthorityIndex;
15use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs, Round, TransactionIndex};
16use itertools::Itertools as _;
17use tokio::time::Instant;
18use tracing::{debug, error, info, trace};
19
20use crate::{
21    CommittedSubDag,
22    block::{BlockAPI, GENESIS_ROUND, Slot, VerifiedBlock, genesis_blocks},
23    commit::{
24        CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRef, CommitVote,
25        GENESIS_COMMIT_INDEX, TrustedCommit, load_committed_subdag_from_store,
26    },
27    context::Context,
28    leader_scoring::{ReputationScores, ScoringSubdag},
29    storage::{Store, WriteBatch},
30    threshold_clock::ThresholdClock,
31};
32
33/// DagState provides the API to write and read accepted blocks from the DAG.
34/// Only uncommitted and last committed blocks are cached in memory.
35/// The rest of blocks are stored on disk.
36/// Refs to cached blocks and additional refs are cached as well, to speed up existence checks.
37///
38/// Note: DagState should be wrapped with Arc<parking_lot::RwLock<_>>, to allow
39/// concurrent access from multiple components.
40pub struct DagState {
41    context: Arc<Context>,
42
43    // The genesis blocks
44    genesis: BTreeMap<BlockRef, VerifiedBlock>,
45
46    // Contains recent blocks within CACHED_ROUNDS from the last committed round per authority.
47    // Note: all uncommitted blocks are kept in memory.
48    //
49    // When GC is enabled, this map has a different semantic. It holds all the recent data for each authority making sure that it always have available
50    // CACHED_ROUNDS worth of data. The entries are evicted based on the latest GC round, however the eviction process will respect the CACHED_ROUNDS.
51    // For each authority, blocks are only evicted when their round is less than or equal to both `gc_round`, and `highest authority round - cached rounds`.
52    // This ensures that the GC requirements are respected (we never clean up any block above `gc_round`), and there are enough blocks cached.
53    recent_blocks: BTreeMap<BlockRef, BlockInfo>,
54
55    // Indexes recent block refs by their authorities.
56    // Vec position corresponds to the authority index.
57    recent_refs_by_authority: Vec<BTreeSet<BlockRef>>,
58
59    // Keeps track of the threshold clock for proposing blocks.
60    threshold_clock: ThresholdClock,
61
62    // Keeps track of the highest round that has been evicted for each authority. Any blocks that are of round <= evict_round
63    // should be considered evicted, and if any exist we should not consider the causauly complete in the order they appear.
64    // The `evicted_rounds` size should be the same as the committee size.
65    evicted_rounds: Vec<Round>,
66
67    // Highest round of blocks accepted.
68    highest_accepted_round: Round,
69
70    // Last consensus commit of the dag.
71    last_commit: Option<TrustedCommit>,
72
73    // Last wall time when commit round advanced. Does not persist across restarts.
74    last_commit_round_advancement_time: Option<std::time::Instant>,
75
76    // Last committed rounds per authority.
77    last_committed_rounds: Vec<Round>,
78
79    /// The committed subdags that have been scored but scores have not been used
80    /// for leader schedule yet.
81    scoring_subdag: ScoringSubdag,
82
83    // Commit votes pending to be included in new blocks.
84    // TODO: limit to 1st commit per round with multi-leader.
85    // TODO: recover unproposed pending commit votes at startup.
86    pending_commit_votes: VecDeque<CommitVote>,
87
88    // Blocks and commits must be buffered for persistence before they can be
89    // inserted into the local DAG or sent to output.
90    blocks_to_write: Vec<VerifiedBlock>,
91    commits_to_write: Vec<TrustedCommit>,
92
93    // Buffers the reputation scores & last_committed_rounds to be flushed with the
94    // next dag state flush. Not writing eagerly is okay because we can recover reputation scores
95    // & last_committed_rounds from the commits as needed.
96    commit_info_to_write: Vec<(CommitRef, CommitInfo)>,
97
98    // Buffers finalized commits and their rejected transactions to be written to storage.
99    finalized_commits_to_write: Vec<(CommitRef, BTreeMap<BlockRef, Vec<TransactionIndex>>)>,
100
101    // Persistent storage for blocks, commits and other consensus data.
102    store: Arc<dyn Store>,
103
104    // The number of cached rounds
105    cached_rounds: Round,
106}
107
108impl DagState {
109    /// Initializes DagState from storage.
110    pub fn new(context: Arc<Context>, store: Arc<dyn Store>) -> Self {
111        let cached_rounds = context.parameters.dag_state_cached_rounds as Round;
112        let num_authorities = context.committee.size();
113
114        let genesis = genesis_blocks(context.as_ref())
115            .into_iter()
116            .map(|block| (block.reference(), block))
117            .collect();
118
119        let threshold_clock = ThresholdClock::new(1, context.clone());
120
121        let last_commit = store
122            .read_last_commit()
123            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
124
125        let commit_info = store
126            .read_last_commit_info()
127            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
128        let (mut last_committed_rounds, commit_recovery_start_index) =
129            if let Some((commit_ref, commit_info)) = commit_info {
130                tracing::info!("Recovering committed state from {commit_ref} {commit_info:?}");
131                (commit_info.committed_rounds, commit_ref.index + 1)
132            } else {
133                tracing::info!("Found no stored CommitInfo to recover from");
134                (vec![0; num_authorities], GENESIS_COMMIT_INDEX + 1)
135            };
136
137        let mut unscored_committed_subdags = Vec::new();
138        let mut scoring_subdag = ScoringSubdag::new(context.clone());
139
140        if let Some(last_commit) = last_commit.as_ref() {
141            store
142                .scan_commits((commit_recovery_start_index..=last_commit.index()).into())
143                .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
144                .iter()
145                .for_each(|commit| {
146                    for block_ref in commit.blocks() {
147                        last_committed_rounds[block_ref.author] =
148                            max(last_committed_rounds[block_ref.author], block_ref.round);
149                    }
150                    let committed_subdag =
151                        load_committed_subdag_from_store(store.as_ref(), commit.clone(), vec![]);
152                    // We don't need to recover reputation scores for unscored_committed_subdags
153                    unscored_committed_subdags.push(committed_subdag);
154                });
155        }
156
157        tracing::info!(
158            "DagState was initialized with the following state: \
159            {last_commit:?}; {last_committed_rounds:?}; {} unscored committed subdags;",
160            unscored_committed_subdags.len()
161        );
162
163        scoring_subdag.add_subdags(std::mem::take(&mut unscored_committed_subdags));
164
165        let mut state = Self {
166            context: context.clone(),
167            genesis,
168            recent_blocks: BTreeMap::new(),
169            recent_refs_by_authority: vec![BTreeSet::new(); num_authorities],
170            threshold_clock,
171            highest_accepted_round: 0,
172            last_commit: last_commit.clone(),
173            last_commit_round_advancement_time: None,
174            last_committed_rounds: last_committed_rounds.clone(),
175            pending_commit_votes: VecDeque::new(),
176            blocks_to_write: vec![],
177            commits_to_write: vec![],
178            commit_info_to_write: vec![],
179            finalized_commits_to_write: vec![],
180            scoring_subdag,
181            store: store.clone(),
182            cached_rounds,
183            evicted_rounds: vec![0; num_authorities],
184        };
185
186        for (authority_index, _) in context.committee.authorities() {
187            let (blocks, eviction_round) = {
188                // Find the latest block for the authority to calculate the eviction round. Then we want to scan and load the blocks from the eviction round and onwards only.
189                // As reminder, the eviction round is taking into account the gc_round.
190                let last_block = state
191                    .store
192                    .scan_last_blocks_by_author(authority_index, 1, None)
193                    .expect("Database error");
194                let last_block_round = last_block
195                    .last()
196                    .map(|b| b.round())
197                    .unwrap_or(GENESIS_ROUND);
198
199                let eviction_round =
200                    Self::eviction_round(last_block_round, state.gc_round(), state.cached_rounds);
201                let blocks = state
202                    .store
203                    .scan_blocks_by_author(authority_index, eviction_round + 1)
204                    .expect("Database error");
205
206                (blocks, eviction_round)
207            };
208
209            state.evicted_rounds[authority_index] = eviction_round;
210
211            // Update the block metadata for the authority.
212            for block in &blocks {
213                state.update_block_metadata(block);
214            }
215
216            debug!(
217                "Recovered blocks {}: {:?}",
218                authority_index,
219                blocks
220                    .iter()
221                    .map(|b| b.reference())
222                    .collect::<Vec<BlockRef>>()
223            );
224        }
225
226        if let Some(last_commit) = last_commit {
227            let mut index = last_commit.index();
228            let gc_round = state.gc_round();
229            info!(
230                "Recovering block commit statuses from commit index {} and backwards until leader of round <= gc_round {:?}",
231                index, gc_round
232            );
233
234            loop {
235                let commits = store
236                    .scan_commits((index..=index).into())
237                    .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
238                let Some(commit) = commits.first() else {
239                    info!("Recovering finished up to index {index}, no more commits to recover");
240                    break;
241                };
242
243                // Check the commit leader round to see if it is within the gc_round. If it is not then we can stop the recovery process.
244                if gc_round > 0 && commit.leader().round <= gc_round {
245                    info!(
246                        "Recovering finished, reached commit leader round {} <= gc_round {}",
247                        commit.leader().round,
248                        gc_round
249                    );
250                    break;
251                }
252
253                commit.blocks().iter().filter(|b| b.round > gc_round).for_each(|block_ref|{
254                    debug!(
255                        "Setting block {:?} as committed based on commit {:?}",
256                        block_ref,
257                        commit.index()
258                    );
259                    assert!(state.set_committed(block_ref), "Attempted to set again a block {:?} as committed when recovering commit {:?}", block_ref, commit);
260                });
261
262                // All commits are indexed starting from 1, so one reach zero exit.
263                index = index.saturating_sub(1);
264                if index == 0 {
265                    break;
266                }
267            }
268        }
269
270        // Recover hard linked statuses for blocks within GC round.
271        let proposed_blocks = store
272            .scan_blocks_by_author(context.own_index, state.gc_round() + 1)
273            .expect("Database error");
274        for block in proposed_blocks {
275            state.link_causal_history(block.reference());
276        }
277
278        state
279    }
280
281    /// Accepts a block into DagState and keeps it in memory.
282    pub(crate) fn accept_block(&mut self, block: VerifiedBlock) {
283        assert_ne!(
284            block.round(),
285            0,
286            "Genesis block should not be accepted into DAG."
287        );
288
289        let block_ref = block.reference();
290        if self.contains_block(&block_ref) {
291            return;
292        }
293
294        let now = self.context.clock.timestamp_utc_ms();
295        if block.timestamp_ms() > now {
296            trace!(
297                "Block {:?} with timestamp {} is greater than local timestamp {}.",
298                block,
299                block.timestamp_ms(),
300                now,
301            );
302        }
303        let hostname = &self.context.committee.authority(block_ref.author).hostname;
304        self.context
305            .metrics
306            .node_metrics
307            .accepted_block_time_drift_ms
308            .with_label_values(&[hostname])
309            .inc_by(block.timestamp_ms().saturating_sub(now));
310
311        // TODO: Move this check to core
312        // Ensure we don't write multiple blocks per slot for our own index
313        if block_ref.author == self.context.own_index {
314            let existing_blocks = self.get_uncommitted_blocks_at_slot(block_ref.into());
315            assert!(
316                existing_blocks.is_empty(),
317                "Block Rejected! Attempted to add block {block:#?} to own slot where \
318                block(s) {existing_blocks:#?} already exists."
319            );
320        }
321        self.update_block_metadata(&block);
322        self.blocks_to_write.push(block);
323        let source = if self.context.own_index == block_ref.author {
324            "own"
325        } else {
326            "others"
327        };
328        self.context
329            .metrics
330            .node_metrics
331            .accepted_blocks
332            .with_label_values(&[source])
333            .inc();
334    }
335
336    /// Updates internal metadata for a block.
337    fn update_block_metadata(&mut self, block: &VerifiedBlock) {
338        let block_ref = block.reference();
339        self.recent_blocks
340            .insert(block_ref, BlockInfo::new(block.clone()));
341        self.recent_refs_by_authority[block_ref.author].insert(block_ref);
342
343        if self.threshold_clock.add_block(block_ref) {
344            // Do not measure quorum delay when no local block is proposed in the round.
345            let last_proposed_block = self.get_last_proposed_block();
346            if last_proposed_block.round() == block_ref.round {
347                let quorum_delay_ms = self
348                    .context
349                    .clock
350                    .timestamp_utc_ms()
351                    .saturating_sub(self.get_last_proposed_block().timestamp_ms());
352                self.context
353                    .metrics
354                    .node_metrics
355                    .quorum_receive_latency
356                    .observe(Duration::from_millis(quorum_delay_ms).as_secs_f64());
357            }
358        }
359
360        self.highest_accepted_round = max(self.highest_accepted_round, block.round());
361        self.context
362            .metrics
363            .node_metrics
364            .highest_accepted_round
365            .set(self.highest_accepted_round as i64);
366
367        let highest_accepted_round_for_author = self.recent_refs_by_authority[block_ref.author]
368            .last()
369            .map(|block_ref| block_ref.round)
370            .expect("There should be by now at least one block ref");
371        let hostname = &self.context.committee.authority(block_ref.author).hostname;
372        self.context
373            .metrics
374            .node_metrics
375            .highest_accepted_authority_round
376            .with_label_values(&[hostname])
377            .set(highest_accepted_round_for_author as i64);
378    }
379
380    /// Accepts a blocks into DagState and keeps it in memory.
381    pub(crate) fn accept_blocks(&mut self, blocks: Vec<VerifiedBlock>) {
382        debug!(
383            "Accepting blocks: {}",
384            blocks.iter().map(|b| b.reference().to_string()).join(",")
385        );
386        for block in blocks {
387            self.accept_block(block);
388        }
389    }
390
391    /// Gets a block by checking cached recent blocks then storage.
392    /// Returns None when the block is not found.
393    pub(crate) fn get_block(&self, reference: &BlockRef) -> Option<VerifiedBlock> {
394        self.get_blocks(&[*reference])
395            .pop()
396            .expect("Exactly one element should be returned")
397    }
398
399    /// Gets blocks by checking genesis, cached recent blocks in memory, then storage.
400    /// An element is None when the corresponding block is not found.
401    pub(crate) fn get_blocks(&self, block_refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
402        let mut blocks = vec![None; block_refs.len()];
403        let mut missing = Vec::new();
404
405        for (index, block_ref) in block_refs.iter().enumerate() {
406            if block_ref.round == GENESIS_ROUND {
407                // Allow the caller to handle the invalid genesis ancestor error.
408                if let Some(block) = self.genesis.get(block_ref) {
409                    blocks[index] = Some(block.clone());
410                }
411                continue;
412            }
413            if let Some(block_info) = self.recent_blocks.get(block_ref) {
414                blocks[index] = Some(block_info.block.clone());
415                continue;
416            }
417            missing.push((index, block_ref));
418        }
419
420        if missing.is_empty() {
421            return blocks;
422        }
423
424        let missing_refs = missing
425            .iter()
426            .map(|(_, block_ref)| **block_ref)
427            .collect::<Vec<_>>();
428        let store_results = self
429            .store
430            .read_blocks(&missing_refs)
431            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
432        self.context
433            .metrics
434            .node_metrics
435            .dag_state_store_read_count
436            .with_label_values(&["get_blocks"])
437            .inc();
438
439        for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
440            blocks[index] = result;
441        }
442
443        blocks
444    }
445
446    /// Gets all uncommitted blocks in a slot.
447    /// Uncommitted blocks must exist in memory, so only in-memory blocks are checked.
448    pub(crate) fn get_uncommitted_blocks_at_slot(&self, slot: Slot) -> Vec<VerifiedBlock> {
449        // TODO: either panic below when the slot is at or below the last committed round,
450        // or support reading from storage while limiting storage reads to edge cases.
451
452        let mut blocks = vec![];
453        for (_block_ref, block_info) in self.recent_blocks.range((
454            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
455            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
456        )) {
457            blocks.push(block_info.block.clone())
458        }
459        blocks
460    }
461
462    /// Gets all uncommitted blocks in a round.
463    /// Uncommitted blocks must exist in memory, so only in-memory blocks are checked.
464    pub(crate) fn get_uncommitted_blocks_at_round(&self, round: Round) -> Vec<VerifiedBlock> {
465        if round <= self.last_commit_round() {
466            panic!("Round {} have committed blocks!", round);
467        }
468
469        let mut blocks = vec![];
470        for (_block_ref, block_info) in self.recent_blocks.range((
471            Included(BlockRef::new(round, AuthorityIndex::ZERO, BlockDigest::MIN)),
472            Excluded(BlockRef::new(
473                round + 1,
474                AuthorityIndex::ZERO,
475                BlockDigest::MIN,
476            )),
477        )) {
478            blocks.push(block_info.block.clone())
479        }
480        blocks
481    }
482
483    /// Gets all ancestors in the history of a block at a certain round.
484    pub(crate) fn ancestors_at_round(
485        &self,
486        later_block: &VerifiedBlock,
487        earlier_round: Round,
488    ) -> Vec<VerifiedBlock> {
489        // Iterate through ancestors of later_block in round descending order.
490        let mut linked: BTreeSet<BlockRef> = later_block.ancestors().iter().cloned().collect();
491        while !linked.is_empty() {
492            let round = linked.last().unwrap().round;
493            // Stop after finishing traversal for ancestors above earlier_round.
494            if round <= earlier_round {
495                break;
496            }
497            let block_ref = linked.pop_last().unwrap();
498            let Some(block) = self.get_block(&block_ref) else {
499                panic!("Block {:?} should exist in DAG!", block_ref);
500            };
501            linked.extend(block.ancestors().iter().cloned());
502        }
503        linked
504            .range((
505                Included(BlockRef::new(
506                    earlier_round,
507                    AuthorityIndex::ZERO,
508                    BlockDigest::MIN,
509                )),
510                Unbounded,
511            ))
512            .map(|r| {
513                self.get_block(r)
514                    .unwrap_or_else(|| panic!("Block {:?} should exist in DAG!", r))
515                    .clone()
516            })
517            .collect()
518    }
519
520    /// Gets the last proposed block from this authority.
521    /// If no block is proposed yet, returns the genesis block.
522    pub(crate) fn get_last_proposed_block(&self) -> VerifiedBlock {
523        self.get_last_block_for_authority(self.context.own_index)
524    }
525
526    /// Retrieves the last accepted block from the specified `authority`. If no block is found in cache
527    /// then the genesis block is returned as no other block has been received from that authority.
528    pub(crate) fn get_last_block_for_authority(&self, authority: AuthorityIndex) -> VerifiedBlock {
529        if let Some(last) = self.recent_refs_by_authority[authority].last() {
530            return self
531                .recent_blocks
532                .get(last)
533                .expect("Block should be found in recent blocks")
534                .block
535                .clone();
536        }
537
538        // if none exists, then fallback to genesis
539        let (_, genesis_block) = self
540            .genesis
541            .iter()
542            .find(|(block_ref, _)| block_ref.author == authority)
543            .expect("Genesis should be found for authority {authority_index}");
544        genesis_block.clone()
545    }
546
547    /// Returns cached recent blocks from the specified authority.
548    /// Blocks returned are limited to round >= `start`, and cached.
549    /// NOTE: caller should not assume returned blocks are always chained.
550    /// "Disconnected" blocks can be returned when there are byzantine blocks,
551    /// or a previously evicted block is accepted again.
552    pub(crate) fn get_cached_blocks(
553        &self,
554        authority: AuthorityIndex,
555        start: Round,
556    ) -> Vec<VerifiedBlock> {
557        self.get_cached_blocks_in_range(authority, start, Round::MAX, usize::MAX)
558    }
559
560    // Retrieves the cached block within the range [start_round, end_round) from a given authority,
561    // limited in total number of blocks.
562    pub(crate) fn get_cached_blocks_in_range(
563        &self,
564        authority: AuthorityIndex,
565        start_round: Round,
566        end_round: Round,
567        limit: usize,
568    ) -> Vec<VerifiedBlock> {
569        if start_round >= end_round || limit == 0 {
570            return vec![];
571        }
572
573        let mut blocks = vec![];
574        for block_ref in self.recent_refs_by_authority[authority].range((
575            Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
576            Excluded(BlockRef::new(
577                end_round,
578                AuthorityIndex::MIN,
579                BlockDigest::MIN,
580            )),
581        )) {
582            let block_info = self
583                .recent_blocks
584                .get(block_ref)
585                .expect("Block should exist in recent blocks");
586            blocks.push(block_info.block.clone());
587            if blocks.len() >= limit {
588                break;
589            }
590        }
591        blocks
592    }
593
594    // Retrieves the last cached block within the range [start_round, end_round) from a given authority.
595    pub(crate) fn get_last_cached_block_in_range(
596        &self,
597        authority: AuthorityIndex,
598        start_round: Round,
599        end_round: Round,
600    ) -> Option<VerifiedBlock> {
601        if start_round >= end_round {
602            return None;
603        }
604
605        let block_ref = self.recent_refs_by_authority[authority]
606            .range((
607                Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
608                Excluded(BlockRef::new(
609                    end_round,
610                    AuthorityIndex::MIN,
611                    BlockDigest::MIN,
612                )),
613            ))
614            .last()?;
615
616        self.recent_blocks
617            .get(block_ref)
618            .map(|block_info| block_info.block.clone())
619    }
620
621    /// Returns the last block proposed per authority with `evicted round < round < end_round`.
622    /// The method is guaranteed to return results only when the `end_round` is not earlier of the
623    /// available cached data for each authority (evicted round + 1), otherwise the method will panic.
624    /// It's the caller's responsibility to ensure that is not requesting for earlier rounds.
625    /// In case of equivocation for an authority's last slot, one block will be returned (the last in order)
626    /// and the other equivocating blocks will be returned.
627    pub(crate) fn get_last_cached_block_per_authority(
628        &self,
629        end_round: Round,
630    ) -> Vec<(VerifiedBlock, Vec<BlockRef>)> {
631        // Initialize with the genesis blocks as fallback
632        let mut blocks = self.genesis.values().cloned().collect::<Vec<_>>();
633        let mut equivocating_blocks = vec![vec![]; self.context.committee.size()];
634
635        if end_round == GENESIS_ROUND {
636            panic!(
637                "Attempted to retrieve blocks earlier than the genesis round which is not possible"
638            );
639        }
640
641        if end_round == GENESIS_ROUND + 1 {
642            return blocks.into_iter().map(|b| (b, vec![])).collect();
643        }
644
645        for (authority_index, block_refs) in self.recent_refs_by_authority.iter().enumerate() {
646            let authority_index = self
647                .context
648                .committee
649                .to_authority_index(authority_index)
650                .unwrap();
651
652            let last_evicted_round = self.evicted_rounds[authority_index];
653            if end_round.saturating_sub(1) <= last_evicted_round {
654                panic!(
655                    "Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}",
656                );
657            }
658
659            let block_ref_iter = block_refs
660                .range((
661                    Included(BlockRef::new(
662                        last_evicted_round + 1,
663                        authority_index,
664                        BlockDigest::MIN,
665                    )),
666                    Excluded(BlockRef::new(end_round, authority_index, BlockDigest::MIN)),
667                ))
668                .rev();
669
670            let mut last_round = 0;
671            for block_ref in block_ref_iter {
672                if last_round == 0 {
673                    last_round = block_ref.round;
674                    let block_info = self
675                        .recent_blocks
676                        .get(block_ref)
677                        .expect("Block should exist in recent blocks");
678                    blocks[authority_index] = block_info.block.clone();
679                    continue;
680                }
681                if block_ref.round < last_round {
682                    break;
683                }
684                equivocating_blocks[authority_index].push(*block_ref);
685            }
686        }
687
688        blocks.into_iter().zip(equivocating_blocks).collect()
689    }
690
691    /// Checks whether a block exists in the slot. The method checks only against the cached data.
692    /// If the user asks for a slot that is not within the cached data then a panic is thrown.
693    pub(crate) fn contains_cached_block_at_slot(&self, slot: Slot) -> bool {
694        // Always return true for genesis slots.
695        if slot.round == GENESIS_ROUND {
696            return true;
697        }
698
699        let eviction_round = self.evicted_rounds[slot.authority];
700        if slot.round <= eviction_round {
701            panic!(
702                "{}",
703                format!(
704                    "Attempted to check for slot {slot} that is <= the last evicted round {eviction_round}"
705                )
706            );
707        }
708
709        let mut result = self.recent_refs_by_authority[slot.authority].range((
710            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)),
711            Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)),
712        ));
713        result.next().is_some()
714    }
715
716    /// Checks whether the required blocks are in cache, if exist, or otherwise will check in store. The method is not caching
717    /// back the results, so its expensive if keep asking for cache missing blocks.
718    pub(crate) fn contains_blocks(&self, block_refs: Vec<BlockRef>) -> Vec<bool> {
719        let mut exist = vec![false; block_refs.len()];
720        let mut missing = Vec::new();
721
722        for (index, block_ref) in block_refs.into_iter().enumerate() {
723            let recent_refs = &self.recent_refs_by_authority[block_ref.author];
724            if recent_refs.contains(&block_ref) || self.genesis.contains_key(&block_ref) {
725                exist[index] = true;
726            } else if recent_refs.is_empty() || recent_refs.last().unwrap().round < block_ref.round
727            {
728                // Optimization: recent_refs contain the most recent blocks known to this authority.
729                // If a block ref is not found there and has a higher round, it definitely is
730                // missing from this authority and there is no need to check disk.
731                exist[index] = false;
732            } else {
733                missing.push((index, block_ref));
734            }
735        }
736
737        if missing.is_empty() {
738            return exist;
739        }
740
741        let missing_refs = missing
742            .iter()
743            .map(|(_, block_ref)| *block_ref)
744            .collect::<Vec<_>>();
745        let store_results = self
746            .store
747            .contains_blocks(&missing_refs)
748            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
749        self.context
750            .metrics
751            .node_metrics
752            .dag_state_store_read_count
753            .with_label_values(&["contains_blocks"])
754            .inc();
755
756        for ((index, _), result) in missing.into_iter().zip(store_results.into_iter()) {
757            exist[index] = result;
758        }
759
760        exist
761    }
762
763    pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool {
764        let blocks = self.contains_blocks(vec![*block_ref]);
765        blocks.first().cloned().unwrap()
766    }
767
768    // Sets the block as committed in the cache. If the block is set as committed for first time, then true is returned, otherwise false is returned instead.
769    // Method will panic if the block is not found in the cache.
770    pub(crate) fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
771        if let Some(block_info) = self.recent_blocks.get_mut(block_ref) {
772            if !block_info.committed {
773                block_info.committed = true;
774                return true;
775            }
776            false
777        } else {
778            panic!(
779                "Block {:?} not found in cache to set as committed.",
780                block_ref
781            );
782        }
783    }
784
785    /// Returns true if the block is committed. Only valid for blocks above the GC round.
786    pub(crate) fn is_committed(&self, block_ref: &BlockRef) -> bool {
787        self.recent_blocks
788            .get(block_ref)
789            .unwrap_or_else(|| panic!("Attempted to query for commit status for a block not in cached data {block_ref}"))
790            .committed
791    }
792
793    /// Recursively sets blocks in the causal history of the root block as hard linked, including the root block itself.
794    /// Returns the list of blocks that are newly linked.
795    /// The returned blocks are guaranteed to be above the GC round.
796    pub(crate) fn link_causal_history(&mut self, root_block: BlockRef) -> Vec<BlockRef> {
797        let gc_round = self.gc_round();
798        let mut linked_blocks = vec![];
799        let mut targets = VecDeque::new();
800        targets.push_back(root_block);
801        while let Some(block_ref) = targets.pop_front() {
802            // This is only correct with GC enabled.
803            if block_ref.round <= gc_round {
804                continue;
805            }
806            let block_info = self
807                .recent_blocks
808                .get_mut(&block_ref)
809                .unwrap_or_else(|| panic!("Block {:?} is not in DAG state", block_ref));
810            if block_info.hard_linked {
811                continue;
812            }
813            linked_blocks.push(block_ref);
814            block_info.hard_linked = true;
815            targets.extend(block_info.block.ancestors().iter());
816        }
817        linked_blocks
818    }
819
820    /// Returns true if the block is hard linked by a proposed block.
821    pub(crate) fn is_hard_linked(&self, block_ref: &BlockRef) -> bool {
822        self.recent_blocks
823            .get(block_ref)
824            .unwrap_or_else(|| panic!("Attempted to query for hard link status for a block not in cached data {block_ref}"))
825            .hard_linked
826    }
827
828    pub(crate) fn threshold_clock_round(&self) -> Round {
829        self.threshold_clock.get_round()
830    }
831
832    // The timestamp of when quorum threshold was last reached in the threshold clock.
833    pub(crate) fn threshold_clock_quorum_ts(&self) -> Instant {
834        self.threshold_clock.get_quorum_ts()
835    }
836
837    pub(crate) fn highest_accepted_round(&self) -> Round {
838        self.highest_accepted_round
839    }
840
841    // Buffers a new commit in memory and updates last committed rounds.
842    // REQUIRED: must not skip over any commit index.
843    pub(crate) fn add_commit(&mut self, commit: TrustedCommit) {
844        let time_diff = if let Some(last_commit) = &self.last_commit {
845            if commit.index() <= last_commit.index() {
846                error!(
847                    "New commit index {} <= last commit index {}!",
848                    commit.index(),
849                    last_commit.index()
850                );
851                return;
852            }
853            assert_eq!(commit.index(), last_commit.index() + 1);
854
855            if commit.timestamp_ms() < last_commit.timestamp_ms() {
856                panic!(
857                    "Commit timestamps do not monotonically increment, prev commit {:?}, new commit {:?}",
858                    last_commit, commit
859                );
860            }
861            commit
862                .timestamp_ms()
863                .saturating_sub(last_commit.timestamp_ms())
864        } else {
865            assert_eq!(commit.index(), 1);
866            0
867        };
868
869        self.context
870            .metrics
871            .node_metrics
872            .last_commit_time_diff
873            .observe(time_diff as f64);
874
875        let commit_round_advanced = if let Some(previous_commit) = &self.last_commit {
876            previous_commit.round() < commit.round()
877        } else {
878            true
879        };
880
881        self.last_commit = Some(commit.clone());
882
883        if commit_round_advanced {
884            let now = std::time::Instant::now();
885            if let Some(previous_time) = self.last_commit_round_advancement_time {
886                self.context
887                    .metrics
888                    .node_metrics
889                    .commit_round_advancement_interval
890                    .observe(now.duration_since(previous_time).as_secs_f64())
891            }
892            self.last_commit_round_advancement_time = Some(now);
893        }
894
895        for block_ref in commit.blocks().iter() {
896            self.last_committed_rounds[block_ref.author] = max(
897                self.last_committed_rounds[block_ref.author],
898                block_ref.round,
899            );
900        }
901
902        for (i, round) in self.last_committed_rounds.iter().enumerate() {
903            let index = self.context.committee.to_authority_index(i).unwrap();
904            let hostname = &self.context.committee.authority(index).hostname;
905            self.context
906                .metrics
907                .node_metrics
908                .last_committed_authority_round
909                .with_label_values(&[hostname])
910                .set((*round).into());
911        }
912
913        self.pending_commit_votes.push_back(commit.reference());
914        self.commits_to_write.push(commit);
915    }
916
917    /// Recovers commits to write from storage, at startup.
918    pub(crate) fn recover_commits_to_write(&mut self, commits: Vec<TrustedCommit>) {
919        self.commits_to_write.extend(commits);
920    }
921
922    pub(crate) fn ensure_commits_to_write_is_empty(&self) {
923        assert!(
924            self.commits_to_write.is_empty(),
925            "Commits to write should be empty. {:?}",
926            self.commits_to_write,
927        );
928    }
929
930    pub(crate) fn add_commit_info(&mut self, reputation_scores: ReputationScores) {
931        // We create an empty scoring subdag once reputation scores are calculated.
932        // Note: It is okay for this to not be gated by protocol config as the
933        // scoring_subdag should be empty in either case at this point.
934        assert!(self.scoring_subdag.is_empty());
935
936        let commit_info = CommitInfo {
937            committed_rounds: self.last_committed_rounds.clone(),
938            reputation_scores,
939        };
940        let last_commit = self
941            .last_commit
942            .as_ref()
943            .expect("Last commit should already be set.");
944        self.commit_info_to_write
945            .push((last_commit.reference(), commit_info));
946    }
947
948    pub(crate) fn add_finalized_commit(
949        &mut self,
950        commit_ref: CommitRef,
951        rejected_transactions: BTreeMap<BlockRef, Vec<TransactionIndex>>,
952    ) {
953        self.finalized_commits_to_write
954            .push((commit_ref, rejected_transactions));
955    }
956
957    pub(crate) fn take_commit_votes(&mut self, limit: usize) -> Vec<CommitVote> {
958        let mut votes = Vec::new();
959        while !self.pending_commit_votes.is_empty() && votes.len() < limit {
960            votes.push(self.pending_commit_votes.pop_front().unwrap());
961        }
962        votes
963    }
964
965    /// Index of the last commit.
966    pub(crate) fn last_commit_index(&self) -> CommitIndex {
967        match &self.last_commit {
968            Some(commit) => commit.index(),
969            None => 0,
970        }
971    }
972
973    /// Digest of the last commit.
974    pub(crate) fn last_commit_digest(&self) -> CommitDigest {
975        match &self.last_commit {
976            Some(commit) => commit.digest(),
977            None => CommitDigest::MIN,
978        }
979    }
980
981    /// Timestamp of the last commit.
982    pub(crate) fn last_commit_timestamp_ms(&self) -> BlockTimestampMs {
983        match &self.last_commit {
984            Some(commit) => commit.timestamp_ms(),
985            None => 0,
986        }
987    }
988
989    /// Leader slot of the last commit.
990    pub(crate) fn last_commit_leader(&self) -> Slot {
991        match &self.last_commit {
992            Some(commit) => commit.leader().into(),
993            None => self
994                .genesis
995                .iter()
996                .next()
997                .map(|(genesis_ref, _)| *genesis_ref)
998                .expect("Genesis blocks should always be available.")
999                .into(),
1000        }
1001    }
1002
1003    /// Highest round where a block is committed, which is last commit's leader round.
1004    pub(crate) fn last_commit_round(&self) -> Round {
1005        match &self.last_commit {
1006            Some(commit) => commit.leader().round,
1007            None => 0,
1008        }
1009    }
1010
1011    /// Last committed round per authority.
1012    pub(crate) fn last_committed_rounds(&self) -> Vec<Round> {
1013        self.last_committed_rounds.clone()
1014    }
1015
1016    /// The GC round is the highest round that blocks of equal or lower round are considered obsolete and no longer possible to be committed.
1017    /// There is no meaning accepting any blocks with round <= gc_round. The Garbage Collection (GC) round is calculated based on the latest
1018    /// committed leader round. When GC is disabled that will return the genesis round.
1019    pub(crate) fn gc_round(&self) -> Round {
1020        self.calculate_gc_round(self.last_commit_round())
1021    }
1022
1023    /// Calculates the GC round from the input leader round, which can be different
1024    /// from the last committed leader round.
1025    pub(crate) fn calculate_gc_round(&self, commit_round: Round) -> Round {
1026        commit_round.saturating_sub(self.context.protocol_config.gc_depth())
1027    }
1028
1029    /// Flushes unpersisted blocks, commits and commit info to storage.
1030    ///
1031    /// REQUIRED: when buffering a block, all of its ancestors and the latest commit which sets the GC round
1032    /// must also be buffered.
1033    /// REQUIRED: when buffering a commit, all of its included blocks and the previous commits must also be buffered.
1034    /// REQUIRED: when flushing, all of the buffered blocks and commits must be flushed together to ensure consistency.
1035    ///
1036    /// After each flush, DagState becomes persisted in storage and it expected to recover
1037    /// all internal states from storage after restarts.
1038    pub(crate) fn flush(&mut self) {
1039        let _s = self
1040            .context
1041            .metrics
1042            .node_metrics
1043            .scope_processing_time
1044            .with_label_values(&["DagState::flush"])
1045            .start_timer();
1046
1047        // Flush buffered data to storage.
1048        let pending_blocks = std::mem::take(&mut self.blocks_to_write);
1049        let pending_commits = std::mem::take(&mut self.commits_to_write);
1050        let pending_commit_info = std::mem::take(&mut self.commit_info_to_write);
1051        let pending_finalized_commits = std::mem::take(&mut self.finalized_commits_to_write);
1052        if pending_blocks.is_empty()
1053            && pending_commits.is_empty()
1054            && pending_commit_info.is_empty()
1055            && pending_finalized_commits.is_empty()
1056        {
1057            return;
1058        }
1059
1060        debug!(
1061            "Flushing {} blocks ({}), {} commits ({}), {} commit infos ({}), {} finalized commits ({}) to storage.",
1062            pending_blocks.len(),
1063            pending_blocks
1064                .iter()
1065                .map(|b| b.reference().to_string())
1066                .join(","),
1067            pending_commits.len(),
1068            pending_commits
1069                .iter()
1070                .map(|c| c.reference().to_string())
1071                .join(","),
1072            pending_commit_info.len(),
1073            pending_commit_info
1074                .iter()
1075                .map(|(commit_ref, _)| commit_ref.to_string())
1076                .join(","),
1077            pending_finalized_commits.len(),
1078            pending_finalized_commits
1079                .iter()
1080                .map(|(commit_ref, _)| commit_ref.to_string())
1081                .join(","),
1082        );
1083        self.store
1084            .write(WriteBatch::new(
1085                pending_blocks,
1086                pending_commits,
1087                pending_commit_info,
1088                pending_finalized_commits,
1089            ))
1090            .unwrap_or_else(|e| panic!("Failed to write to storage: {:?}", e));
1091        self.context
1092            .metrics
1093            .node_metrics
1094            .dag_state_store_write_count
1095            .inc();
1096
1097        // Clean up old cached data. After flushing, all cached blocks are guaranteed to be persisted.
1098        for (authority_index, _) in self.context.committee.authorities() {
1099            let eviction_round = self.calculate_authority_eviction_round(authority_index);
1100            while let Some(block_ref) = self.recent_refs_by_authority[authority_index].first() {
1101                if block_ref.round <= eviction_round {
1102                    self.recent_blocks.remove(block_ref);
1103                    self.recent_refs_by_authority[authority_index].pop_first();
1104                } else {
1105                    break;
1106                }
1107            }
1108            self.evicted_rounds[authority_index] = eviction_round;
1109        }
1110
1111        let metrics = &self.context.metrics.node_metrics;
1112        metrics
1113            .dag_state_recent_blocks
1114            .set(self.recent_blocks.len() as i64);
1115        metrics.dag_state_recent_refs.set(
1116            self.recent_refs_by_authority
1117                .iter()
1118                .map(BTreeSet::len)
1119                .sum::<usize>() as i64,
1120        );
1121    }
1122
1123    pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> {
1124        self.store
1125            .read_last_commit_info()
1126            .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
1127    }
1128
1129    pub(crate) fn add_scoring_subdags(&mut self, scoring_subdags: Vec<CommittedSubDag>) {
1130        self.scoring_subdag.add_subdags(scoring_subdags);
1131    }
1132
1133    pub(crate) fn clear_scoring_subdag(&mut self) {
1134        self.scoring_subdag.clear();
1135    }
1136
1137    pub(crate) fn scoring_subdags_count(&self) -> usize {
1138        self.scoring_subdag.scored_subdags_count()
1139    }
1140
1141    pub(crate) fn is_scoring_subdag_empty(&self) -> bool {
1142        self.scoring_subdag.is_empty()
1143    }
1144
1145    pub(crate) fn calculate_scoring_subdag_scores(&self) -> ReputationScores {
1146        self.scoring_subdag.calculate_distributed_vote_scores()
1147    }
1148
1149    pub(crate) fn scoring_subdag_commit_range(&self) -> CommitIndex {
1150        self.scoring_subdag
1151            .commit_range
1152            .as_ref()
1153            .expect("commit range should exist for scoring subdag")
1154            .end()
1155    }
1156
1157    /// The last round that should get evicted after a cache clean up operation. After this round we are
1158    /// guaranteed to have all the produced blocks from that authority. For any round that is
1159    /// <= `last_evicted_round` we don't have such guarantees as out of order blocks might exist.
1160    fn calculate_authority_eviction_round(&self, authority_index: AuthorityIndex) -> Round {
1161        let last_round = self.recent_refs_by_authority[authority_index]
1162            .last()
1163            .map(|block_ref| block_ref.round)
1164            .unwrap_or(GENESIS_ROUND);
1165
1166        Self::eviction_round(last_round, self.gc_round(), self.cached_rounds)
1167    }
1168
1169    /// Calculates the eviction round for the given authority. The goal is to keep at least `cached_rounds`
1170    /// of the latest blocks in the cache (if enough data is available), while evicting blocks with rounds <= `gc_round` when possible.
1171    fn eviction_round(last_round: Round, gc_round: Round, cached_rounds: u32) -> Round {
1172        gc_round.min(last_round.saturating_sub(cached_rounds))
1173    }
1174
1175    /// Returns the underlying store.
1176    pub(crate) fn store(&self) -> Arc<dyn Store> {
1177        self.store.clone()
1178    }
1179
1180    /// Detects and returns the blocks of the round that forms the last quorum. The method will return
1181    /// the quorum even if that's genesis.
1182    #[cfg(test)]
1183    pub(crate) fn last_quorum(&self) -> Vec<VerifiedBlock> {
1184        // the quorum should exist either on the highest accepted round or the one before. If we fail to detect
1185        // a quorum then it means that our DAG has advanced with missing causal history.
1186        for round in
1187            (self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev()
1188        {
1189            if round == GENESIS_ROUND {
1190                return self.genesis_blocks();
1191            }
1192            use crate::stake_aggregator::{QuorumThreshold, StakeAggregator};
1193            let mut quorum = StakeAggregator::<QuorumThreshold>::new();
1194
1195            // Since the minimum wave length is 3 we expect to find a quorum in the uncommitted rounds.
1196            let blocks = self.get_uncommitted_blocks_at_round(round);
1197            for block in &blocks {
1198                if quorum.add(block.author(), &self.context.committee) {
1199                    return blocks;
1200                }
1201            }
1202        }
1203
1204        panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds.");
1205    }
1206
1207    #[cfg(test)]
1208    pub(crate) fn genesis_blocks(&self) -> Vec<VerifiedBlock> {
1209        self.genesis.values().cloned().collect()
1210    }
1211
1212    #[cfg(test)]
1213    pub(crate) fn set_last_commit(&mut self, commit: TrustedCommit) {
1214        self.last_commit = Some(commit);
1215    }
1216}
1217
1218struct BlockInfo {
1219    block: VerifiedBlock,
1220    // Whether the block has been committed
1221    committed: bool,
1222    // Whether the block has been hard linked by a proposed block.
1223    hard_linked: bool,
1224}
1225
1226impl BlockInfo {
1227    fn new(block: VerifiedBlock) -> Self {
1228        Self {
1229            block,
1230            committed: false,
1231            hard_linked: false,
1232        }
1233    }
1234}
1235
1236#[cfg(test)]
1237mod test {
1238    use std::vec;
1239
1240    use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs};
1241    use parking_lot::RwLock;
1242
1243    use super::*;
1244    use crate::{
1245        block::{TestBlock, VerifiedBlock},
1246        storage::{WriteBatch, mem_store::MemStore},
1247        test_dag_builder::DagBuilder,
1248        test_dag_parser::parse_dag,
1249    };
1250
1251    #[tokio::test]
1252    async fn test_get_blocks() {
1253        let (context, _) = Context::new_for_test(4);
1254        let context = Arc::new(context);
1255        let store = Arc::new(MemStore::new());
1256        let mut dag_state = DagState::new(context.clone(), store.clone());
1257        let own_index = AuthorityIndex::new_for_test(0);
1258
1259        // Populate test blocks for round 1 ~ 10, authorities 0 ~ 2.
1260        let num_rounds: u32 = 10;
1261        let non_existent_round: u32 = 100;
1262        let num_authorities: u32 = 3;
1263        let num_blocks_per_slot: usize = 3;
1264        let mut blocks = BTreeMap::new();
1265        for round in 1..=num_rounds {
1266            for author in 0..num_authorities {
1267                // Create 3 blocks per slot, with different timestamps and digests.
1268                let base_ts = round as BlockTimestampMs * 1000;
1269                for timestamp in base_ts..base_ts + num_blocks_per_slot as u64 {
1270                    let block = VerifiedBlock::new_for_test(
1271                        TestBlock::new(round, author)
1272                            .set_timestamp_ms(timestamp)
1273                            .build(),
1274                    );
1275                    dag_state.accept_block(block.clone());
1276                    blocks.insert(block.reference(), block);
1277
1278                    // Only write one block per slot for own index
1279                    if AuthorityIndex::new_for_test(author) == own_index {
1280                        break;
1281                    }
1282                }
1283            }
1284        }
1285
1286        // Check uncommitted blocks that exist.
1287        for (r, block) in &blocks {
1288            assert_eq!(&dag_state.get_block(r).unwrap(), block);
1289        }
1290
1291        // Check uncommitted blocks that do not exist.
1292        let last_ref = blocks.keys().last().unwrap();
1293        assert!(
1294            dag_state
1295                .get_block(&BlockRef::new(
1296                    last_ref.round,
1297                    last_ref.author,
1298                    BlockDigest::MIN
1299                ))
1300                .is_none()
1301        );
1302
1303        // Check slots with uncommitted blocks.
1304        for round in 1..=num_rounds {
1305            for author in 0..num_authorities {
1306                let slot = Slot::new(
1307                    round,
1308                    context
1309                        .committee
1310                        .to_authority_index(author as usize)
1311                        .unwrap(),
1312                );
1313                let blocks = dag_state.get_uncommitted_blocks_at_slot(slot);
1314
1315                // We only write one block per slot for own index
1316                if AuthorityIndex::new_for_test(author) == own_index {
1317                    assert_eq!(blocks.len(), 1);
1318                } else {
1319                    assert_eq!(blocks.len(), num_blocks_per_slot);
1320                }
1321
1322                for b in blocks {
1323                    assert_eq!(b.round(), round);
1324                    assert_eq!(
1325                        b.author(),
1326                        context
1327                            .committee
1328                            .to_authority_index(author as usize)
1329                            .unwrap()
1330                    );
1331                }
1332            }
1333        }
1334
1335        // Check slots without uncommitted blocks.
1336        let slot = Slot::new(non_existent_round, AuthorityIndex::ZERO);
1337        assert!(dag_state.get_uncommitted_blocks_at_slot(slot).is_empty());
1338
1339        // Check rounds with uncommitted blocks.
1340        for round in 1..=num_rounds {
1341            let blocks = dag_state.get_uncommitted_blocks_at_round(round);
1342            // Expect 3 blocks per authority except for own authority which should
1343            // have 1 block.
1344            assert_eq!(
1345                blocks.len(),
1346                (num_authorities - 1) as usize * num_blocks_per_slot + 1
1347            );
1348            for b in blocks {
1349                assert_eq!(b.round(), round);
1350            }
1351        }
1352
1353        // Check rounds without uncommitted blocks.
1354        assert!(
1355            dag_state
1356                .get_uncommitted_blocks_at_round(non_existent_round)
1357                .is_empty()
1358        );
1359    }
1360
1361    #[tokio::test]
1362    async fn test_ancestors_at_uncommitted_round() {
1363        // Initialize DagState.
1364        let (context, _) = Context::new_for_test(4);
1365        let context = Arc::new(context);
1366        let store = Arc::new(MemStore::new());
1367        let mut dag_state = DagState::new(context.clone(), store.clone());
1368
1369        // Populate DagState.
1370
1371        // Round 10 refs will not have their blocks in DagState.
1372        let round_10_refs: Vec<_> = (0..4)
1373            .map(|a| {
1374                VerifiedBlock::new_for_test(TestBlock::new(10, a).set_timestamp_ms(1000).build())
1375                    .reference()
1376            })
1377            .collect();
1378
1379        // Round 11 blocks.
1380        let round_11 = vec![
1381            // This will connect to round 12.
1382            VerifiedBlock::new_for_test(
1383                TestBlock::new(11, 0)
1384                    .set_timestamp_ms(1100)
1385                    .set_ancestors(round_10_refs.clone())
1386                    .build(),
1387            ),
1388            // Slot(11, 1) has 3 blocks.
1389            // This will connect to round 12.
1390            VerifiedBlock::new_for_test(
1391                TestBlock::new(11, 1)
1392                    .set_timestamp_ms(1110)
1393                    .set_ancestors(round_10_refs.clone())
1394                    .build(),
1395            ),
1396            // This will connect to round 13.
1397            VerifiedBlock::new_for_test(
1398                TestBlock::new(11, 1)
1399                    .set_timestamp_ms(1111)
1400                    .set_ancestors(round_10_refs.clone())
1401                    .build(),
1402            ),
1403            // This will not connect to any block.
1404            VerifiedBlock::new_for_test(
1405                TestBlock::new(11, 1)
1406                    .set_timestamp_ms(1112)
1407                    .set_ancestors(round_10_refs.clone())
1408                    .build(),
1409            ),
1410            // This will not connect to any block.
1411            VerifiedBlock::new_for_test(
1412                TestBlock::new(11, 2)
1413                    .set_timestamp_ms(1120)
1414                    .set_ancestors(round_10_refs.clone())
1415                    .build(),
1416            ),
1417            // This will connect to round 12.
1418            VerifiedBlock::new_for_test(
1419                TestBlock::new(11, 3)
1420                    .set_timestamp_ms(1130)
1421                    .set_ancestors(round_10_refs.clone())
1422                    .build(),
1423            ),
1424        ];
1425
1426        // Round 12 blocks.
1427        let ancestors_for_round_12 = vec![
1428            round_11[0].reference(),
1429            round_11[1].reference(),
1430            round_11[5].reference(),
1431        ];
1432        let round_12 = vec![
1433            VerifiedBlock::new_for_test(
1434                TestBlock::new(12, 0)
1435                    .set_timestamp_ms(1200)
1436                    .set_ancestors(ancestors_for_round_12.clone())
1437                    .build(),
1438            ),
1439            VerifiedBlock::new_for_test(
1440                TestBlock::new(12, 2)
1441                    .set_timestamp_ms(1220)
1442                    .set_ancestors(ancestors_for_round_12.clone())
1443                    .build(),
1444            ),
1445            VerifiedBlock::new_for_test(
1446                TestBlock::new(12, 3)
1447                    .set_timestamp_ms(1230)
1448                    .set_ancestors(ancestors_for_round_12.clone())
1449                    .build(),
1450            ),
1451        ];
1452
1453        // Round 13 blocks.
1454        let ancestors_for_round_13 = vec![
1455            round_12[0].reference(),
1456            round_12[1].reference(),
1457            round_12[2].reference(),
1458            round_11[2].reference(),
1459        ];
1460        let round_13 = vec![
1461            VerifiedBlock::new_for_test(
1462                TestBlock::new(12, 1)
1463                    .set_timestamp_ms(1300)
1464                    .set_ancestors(ancestors_for_round_13.clone())
1465                    .build(),
1466            ),
1467            VerifiedBlock::new_for_test(
1468                TestBlock::new(12, 2)
1469                    .set_timestamp_ms(1320)
1470                    .set_ancestors(ancestors_for_round_13.clone())
1471                    .build(),
1472            ),
1473            VerifiedBlock::new_for_test(
1474                TestBlock::new(12, 3)
1475                    .set_timestamp_ms(1330)
1476                    .set_ancestors(ancestors_for_round_13.clone())
1477                    .build(),
1478            ),
1479        ];
1480
1481        // Round 14 anchor block.
1482        let ancestors_for_round_14 = round_13.iter().map(|b| b.reference()).collect();
1483        let anchor = VerifiedBlock::new_for_test(
1484            TestBlock::new(14, 1)
1485                .set_timestamp_ms(1410)
1486                .set_ancestors(ancestors_for_round_14)
1487                .build(),
1488        );
1489
1490        // Add all blocks (at and above round 11) to DagState.
1491        for b in round_11
1492            .iter()
1493            .chain(round_12.iter())
1494            .chain(round_13.iter())
1495            .chain([anchor.clone()].iter())
1496        {
1497            dag_state.accept_block(b.clone());
1498        }
1499
1500        // Check ancestors connected to anchor.
1501        let ancestors = dag_state.ancestors_at_round(&anchor, 11);
1502        let mut ancestors_refs: Vec<BlockRef> = ancestors.iter().map(|b| b.reference()).collect();
1503        ancestors_refs.sort();
1504        let mut expected_refs = vec![
1505            round_11[0].reference(),
1506            round_11[1].reference(),
1507            round_11[2].reference(),
1508            round_11[5].reference(),
1509        ];
1510        expected_refs.sort(); // we need to sort as blocks with same author and round of round 11 (position 1 & 2) might not be in right lexicographical order.
1511        assert_eq!(
1512            ancestors_refs, expected_refs,
1513            "Expected round 11 ancestors: {:?}. Got: {:?}",
1514            expected_refs, ancestors_refs
1515        );
1516    }
1517
1518    #[tokio::test]
1519    async fn test_link_causal_history() {
1520        let (mut context, _) = Context::new_for_test(4);
1521        context.parameters.dag_state_cached_rounds = 10;
1522        context
1523            .protocol_config
1524            .set_consensus_gc_depth_for_testing(3);
1525        let context = Arc::new(context);
1526
1527        let store = Arc::new(MemStore::new());
1528        let mut dag_state = DagState::new(context.clone(), store.clone());
1529
1530        // Create for rounds 1..=6. Skip creating blocks for authority 0 for rounds 4 - 6.
1531        let mut dag_builder = DagBuilder::new(context.clone());
1532        dag_builder.layers(1..=3).build();
1533        dag_builder
1534            .layers(4..=6)
1535            .authorities(vec![AuthorityIndex::new_for_test(0)])
1536            .skip_block()
1537            .build();
1538
1539        // Accept all blocks
1540        let all_blocks = dag_builder.all_blocks();
1541        dag_state.accept_blocks(all_blocks.clone());
1542
1543        // No block is linked yet.
1544        for block in &all_blocks {
1545            assert!(!dag_state.is_hard_linked(&block.reference()));
1546        }
1547
1548        // Link causal history from a round 1 block.
1549        let round_1_block = &all_blocks[1];
1550        assert_eq!(round_1_block.round(), 1);
1551        let linked_blocks = dag_state.link_causal_history(round_1_block.reference());
1552
1553        // Check that the block is linked.
1554        assert_eq!(linked_blocks.len(), 1);
1555        assert_eq!(linked_blocks[0], round_1_block.reference());
1556        for block_ref in linked_blocks {
1557            assert!(dag_state.is_hard_linked(&block_ref));
1558        }
1559
1560        // Link causal history from a round 2 block.
1561        let round_2_block = &all_blocks[4];
1562        assert_eq!(round_2_block.round(), 2);
1563        let linked_blocks = dag_state.link_causal_history(round_2_block.reference());
1564
1565        // Check the linked blocks.
1566        assert_eq!(linked_blocks.len(), 4);
1567        for block_ref in linked_blocks {
1568            assert!(block_ref == round_2_block.reference() || block_ref.round == 1);
1569        }
1570
1571        // Check linked status in dag state.
1572        for block in &all_blocks {
1573            if block.round() == 1 || block.reference() == round_2_block.reference() {
1574                assert!(dag_state.is_hard_linked(&block.reference()));
1575            } else {
1576                assert!(!dag_state.is_hard_linked(&block.reference()));
1577            }
1578        }
1579
1580        // Select round 6 block.
1581        let round_6_block = all_blocks.last().unwrap();
1582        assert_eq!(round_6_block.round(), 6);
1583
1584        // Get GC round to 3.
1585        let last_commit = TrustedCommit::new_for_test(
1586            6,
1587            CommitDigest::MIN,
1588            context.clock.timestamp_utc_ms(),
1589            round_6_block.reference(),
1590            vec![],
1591        );
1592        dag_state.set_last_commit(last_commit);
1593        assert_eq!(
1594            dag_state.gc_round(),
1595            3,
1596            "GC round should have moved to round 3"
1597        );
1598
1599        // Link causal history from a round 6 block.
1600        let linked_blocks = dag_state.link_causal_history(round_6_block.reference());
1601
1602        // Check the linked blocks. They should not include GC'ed blocks.
1603        assert_eq!(linked_blocks.len(), 7, "Linked blocks: {:?}", linked_blocks);
1604        for block_ref in linked_blocks {
1605            assert!(
1606                block_ref.round == 4
1607                    || block_ref.round == 5
1608                    || block_ref == round_6_block.reference()
1609            );
1610        }
1611
1612        // Check linked status in dag state.
1613        for block in &all_blocks {
1614            let block_ref = block.reference();
1615            if block.round() == 1
1616                || block_ref == round_2_block.reference()
1617                || block_ref.round == 4
1618                || block_ref.round == 5
1619                || block_ref == round_6_block.reference()
1620            {
1621                assert!(dag_state.is_hard_linked(&block.reference()));
1622            } else {
1623                assert!(!dag_state.is_hard_linked(&block.reference()));
1624            }
1625        }
1626    }
1627
1628    #[tokio::test]
1629    async fn test_contains_blocks_in_cache_or_store() {
1630        /// Only keep elements up to 2 rounds before the last committed round
1631        const CACHED_ROUNDS: Round = 2;
1632
1633        let (mut context, _) = Context::new_for_test(4);
1634        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1635
1636        let context = Arc::new(context);
1637        let store = Arc::new(MemStore::new());
1638        let mut dag_state = DagState::new(context.clone(), store.clone());
1639
1640        // Create test blocks for round 1 ~ 10
1641        let num_rounds: u32 = 10;
1642        let num_authorities: u32 = 4;
1643        let mut blocks = Vec::new();
1644
1645        for round in 1..=num_rounds {
1646            for author in 0..num_authorities {
1647                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1648                blocks.push(block);
1649            }
1650        }
1651
1652        // Now write in store the blocks from first 4 rounds and the rest to the dag state
1653        blocks.clone().into_iter().for_each(|block| {
1654            if block.round() <= 4 {
1655                store
1656                    .write(WriteBatch::default().blocks(vec![block]))
1657                    .unwrap();
1658            } else {
1659                dag_state.accept_blocks(vec![block]);
1660            }
1661        });
1662
1663        // Now when trying to query whether we have all the blocks, we should successfully retrieve a positive answer
1664        // where the blocks of first 4 round should be found in DagState and the rest in store.
1665        let mut block_refs = blocks
1666            .iter()
1667            .map(|block| block.reference())
1668            .collect::<Vec<_>>();
1669        let result = dag_state.contains_blocks(block_refs.clone());
1670
1671        // Ensure everything is found
1672        let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1673        assert_eq!(result, expected);
1674
1675        // Now try to ask also for one block ref that is neither in cache nor in store
1676        block_refs.insert(
1677            3,
1678            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1679        );
1680        let result = dag_state.contains_blocks(block_refs.clone());
1681
1682        // Then all should be found apart from the last one
1683        expected.insert(3, false);
1684        assert_eq!(result, expected.clone());
1685    }
1686
1687    #[tokio::test]
1688    async fn test_contains_cached_block_at_slot() {
1689        /// Only keep elements up to 2 rounds before the last committed round
1690        const CACHED_ROUNDS: Round = 2;
1691
1692        let num_authorities: u32 = 4;
1693        let (mut context, _) = Context::new_for_test(num_authorities as usize);
1694        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1695
1696        let context = Arc::new(context);
1697        let store = Arc::new(MemStore::new());
1698        let mut dag_state = DagState::new(context.clone(), store.clone());
1699
1700        // Create test blocks for round 1 ~ 10
1701        let num_rounds: u32 = 10;
1702        let mut blocks = Vec::new();
1703
1704        for round in 1..=num_rounds {
1705            for author in 0..num_authorities {
1706                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1707                blocks.push(block.clone());
1708                dag_state.accept_block(block);
1709            }
1710        }
1711
1712        // Query for genesis round 0, genesis blocks should be returned
1713        for (author, _) in context.committee.authorities() {
1714            assert!(
1715                dag_state.contains_cached_block_at_slot(Slot::new(GENESIS_ROUND, author)),
1716                "Genesis should always be found"
1717            );
1718        }
1719
1720        // Now when trying to query whether we have all the blocks, we should successfully retrieve a positive answer
1721        // where the blocks of first 4 round should be found in DagState and the rest in store.
1722        let mut block_refs = blocks
1723            .iter()
1724            .map(|block| block.reference())
1725            .collect::<Vec<_>>();
1726
1727        for block_ref in block_refs.clone() {
1728            let slot = block_ref.into();
1729            let found = dag_state.contains_cached_block_at_slot(slot);
1730            assert!(found, "A block should be found at slot {}", slot);
1731        }
1732
1733        // Now try to ask also for one block ref that is not in cache
1734        // Then all should be found apart from the last one
1735        block_refs.insert(
1736            3,
1737            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1738        );
1739        let mut expected = vec![true; (num_rounds * num_authorities) as usize];
1740        expected.insert(3, false);
1741
1742        // Attempt to check the same for via the contains slot method
1743        for block_ref in block_refs {
1744            let slot = block_ref.into();
1745            let found = dag_state.contains_cached_block_at_slot(slot);
1746
1747            assert_eq!(expected.remove(0), found);
1748        }
1749    }
1750
1751    #[tokio::test]
1752    #[ignore]
1753    #[should_panic(
1754        expected = "Attempted to check for slot [1]3 that is <= the last gc evicted round 3"
1755    )]
1756    async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range() {
1757        /// Keep 2 rounds from the highest committed round. This is considered universal and minimum necessary blocks to hold
1758        /// for the correct node operation.
1759        const GC_DEPTH: u32 = 2;
1760        /// Keep at least 3 rounds in cache for each authority.
1761        const CACHED_ROUNDS: Round = 3;
1762
1763        let (mut context, _) = Context::new_for_test(4);
1764        context
1765            .protocol_config
1766            .set_consensus_gc_depth_for_testing(GC_DEPTH);
1767        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1768
1769        let context = Arc::new(context);
1770        let store = Arc::new(MemStore::new());
1771        let mut dag_state = DagState::new(context.clone(), store.clone());
1772
1773        // Create for rounds 1..=6. Skip creating blocks for authority 0 for rounds 4 - 6.
1774        let mut dag_builder = DagBuilder::new(context.clone());
1775        dag_builder.layers(1..=3).build();
1776        dag_builder
1777            .layers(4..=6)
1778            .authorities(vec![AuthorityIndex::new_for_test(0)])
1779            .skip_block()
1780            .build();
1781
1782        // Accept all blocks
1783        dag_builder
1784            .all_blocks()
1785            .into_iter()
1786            .for_each(|block| dag_state.accept_block(block));
1787
1788        // Now add a commit for leader round 5 to trigger an eviction
1789        dag_state.add_commit(TrustedCommit::new_for_test(
1790            1 as CommitIndex,
1791            CommitDigest::MIN,
1792            0,
1793            dag_builder.leader_block(5).unwrap().reference(),
1794            vec![],
1795        ));
1796        // Flush the DAG state to storage.
1797        dag_state.flush();
1798
1799        // Ensure that gc round has been updated
1800        assert_eq!(dag_state.gc_round(), 3, "GC round should be 3");
1801
1802        // Now what we expect to happen is for:
1803        // * Nodes 1 - 3 should have in cache blocks from gc_round (3) and onwards.
1804        // * Node 0 should have in cache blocks from it's latest round, 3, up to round 1, which is the number of cached_rounds.
1805        for authority_index in 1..=3 {
1806            for round in 4..=6 {
1807                assert!(dag_state.contains_cached_block_at_slot(Slot::new(
1808                    round,
1809                    AuthorityIndex::new_for_test(authority_index)
1810                )));
1811            }
1812        }
1813
1814        for round in 1..=3 {
1815            assert!(
1816                dag_state.contains_cached_block_at_slot(Slot::new(
1817                    round,
1818                    AuthorityIndex::new_for_test(0)
1819                ))
1820            );
1821        }
1822
1823        // When trying to request for authority 1 at block slot 3 it should panic, as anything
1824        // that is <= 3 should be evicted
1825        let _ =
1826            dag_state.contains_cached_block_at_slot(Slot::new(3, AuthorityIndex::new_for_test(1)));
1827    }
1828
1829    #[tokio::test]
1830    async fn test_get_blocks_in_cache_or_store() {
1831        let (context, _) = Context::new_for_test(4);
1832        let context = Arc::new(context);
1833        let store = Arc::new(MemStore::new());
1834        let mut dag_state = DagState::new(context.clone(), store.clone());
1835
1836        // Create test blocks for round 1 ~ 10
1837        let num_rounds: u32 = 10;
1838        let num_authorities: u32 = 4;
1839        let mut blocks = Vec::new();
1840
1841        for round in 1..=num_rounds {
1842            for author in 0..num_authorities {
1843                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
1844                blocks.push(block);
1845            }
1846        }
1847
1848        // Now write in store the blocks from first 4 rounds and the rest to the dag state
1849        blocks.clone().into_iter().for_each(|block| {
1850            if block.round() <= 4 {
1851                store
1852                    .write(WriteBatch::default().blocks(vec![block]))
1853                    .unwrap();
1854            } else {
1855                dag_state.accept_blocks(vec![block]);
1856            }
1857        });
1858
1859        // Now when trying to query whether we have all the blocks, we should successfully retrieve a positive answer
1860        // where the blocks of first 4 round should be found in DagState and the rest in store.
1861        let mut block_refs = blocks
1862            .iter()
1863            .map(|block| block.reference())
1864            .collect::<Vec<_>>();
1865        let result = dag_state.get_blocks(&block_refs);
1866
1867        let mut expected = blocks
1868            .into_iter()
1869            .map(Some)
1870            .collect::<Vec<Option<VerifiedBlock>>>();
1871
1872        // Ensure everything is found
1873        assert_eq!(result, expected.clone());
1874
1875        // Now try to ask also for one block ref that is neither in cache nor in store
1876        block_refs.insert(
1877            3,
1878            BlockRef::new(11, AuthorityIndex::new_for_test(3), BlockDigest::default()),
1879        );
1880        let result = dag_state.get_blocks(&block_refs);
1881
1882        // Then all should be found apart from the last one
1883        expected.insert(3, None);
1884        assert_eq!(result, expected);
1885    }
1886
1887    #[tokio::test]
1888    async fn test_flush_and_recovery() {
1889        telemetry_subscribers::init_for_testing();
1890
1891        const GC_DEPTH: u32 = 3;
1892        const CACHED_ROUNDS: u32 = 4;
1893
1894        let num_authorities: u32 = 4;
1895        let (mut context, _) = Context::new_for_test(num_authorities as usize);
1896        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
1897        context
1898            .protocol_config
1899            .set_consensus_gc_depth_for_testing(GC_DEPTH);
1900
1901        let context = Arc::new(context);
1902
1903        let store = Arc::new(MemStore::new());
1904        let mut dag_state = DagState::new(context.clone(), store.clone());
1905
1906        const NUM_ROUNDS: Round = 20;
1907        let mut dag_builder = DagBuilder::new(context.clone());
1908        dag_builder.layers(1..=5).build();
1909        dag_builder
1910            .layers(6..=8)
1911            .authorities(vec![AuthorityIndex::new_for_test(0)])
1912            .skip_block()
1913            .build();
1914        dag_builder.layers(9..=NUM_ROUNDS).build();
1915
1916        // Get all commits from the DAG builder.
1917        const LAST_COMMIT_ROUND: Round = 16;
1918        const LAST_COMMIT_INDEX: CommitIndex = 15;
1919        let commits = dag_builder
1920            .get_sub_dag_and_commits(1..=NUM_ROUNDS)
1921            .into_iter()
1922            .map(|(_subdag, commit)| commit)
1923            .take(LAST_COMMIT_INDEX as usize)
1924            .collect::<Vec<_>>();
1925        assert_eq!(commits.len(), LAST_COMMIT_INDEX as usize);
1926        assert_eq!(commits.last().unwrap().round(), LAST_COMMIT_ROUND);
1927
1928        // Add the blocks from first 11 rounds and first 8 commits to the dag state
1929        // Note that the commit of round 8 is missing because where authority 0 is the leader but produced no block.
1930        // So commit 8 has leader round 9.
1931        const PERSISTED_BLOCK_ROUNDS: u32 = 12;
1932        const NUM_PERSISTED_COMMITS: usize = 8;
1933        const LAST_PERSISTED_COMMIT_ROUND: Round = 9;
1934        const LAST_PERSISTED_COMMIT_INDEX: CommitIndex = 8;
1935        dag_state.accept_blocks(dag_builder.blocks(1..=PERSISTED_BLOCK_ROUNDS));
1936        let mut finalized_commits = vec![];
1937        for commit in commits.iter().take(NUM_PERSISTED_COMMITS).cloned() {
1938            finalized_commits.push(commit.clone());
1939            dag_state.add_commit(commit);
1940        }
1941        let last_finalized_commit = finalized_commits.last().unwrap();
1942        assert_eq!(last_finalized_commit.round(), LAST_PERSISTED_COMMIT_ROUND);
1943        assert_eq!(last_finalized_commit.index(), LAST_PERSISTED_COMMIT_INDEX);
1944
1945        // Collect finalized blocks.
1946        let finalized_blocks = finalized_commits
1947            .iter()
1948            .flat_map(|commit| commit.blocks())
1949            .collect::<BTreeSet<_>>();
1950
1951        // Flush commits from the dag state
1952        dag_state.flush();
1953
1954        // Verify the store has blocks up to round 12, and commits up to index 8.
1955        let store_blocks = store
1956            .scan_blocks_by_author(AuthorityIndex::new_for_test(1), 1)
1957            .unwrap();
1958        assert_eq!(store_blocks.last().unwrap().round(), PERSISTED_BLOCK_ROUNDS);
1959        let store_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1960        assert_eq!(store_commits.len(), NUM_PERSISTED_COMMITS);
1961        assert_eq!(
1962            store_commits.last().unwrap().index(),
1963            LAST_PERSISTED_COMMIT_INDEX
1964        );
1965        assert_eq!(
1966            store_commits.last().unwrap().round(),
1967            LAST_PERSISTED_COMMIT_ROUND
1968        );
1969
1970        // Add the rest of the blocks and commits to the dag state
1971        dag_state.accept_blocks(dag_builder.blocks(PERSISTED_BLOCK_ROUNDS + 1..=NUM_ROUNDS));
1972        for commit in commits.iter().skip(NUM_PERSISTED_COMMITS).cloned() {
1973            dag_state.add_commit(commit);
1974        }
1975
1976        // All blocks should be found in DagState.
1977        let all_blocks = dag_builder.blocks(1..=NUM_ROUNDS);
1978        let block_refs = all_blocks
1979            .iter()
1980            .map(|block| block.reference())
1981            .collect::<Vec<_>>();
1982        let result = dag_state
1983            .get_blocks(&block_refs)
1984            .into_iter()
1985            .map(|b| b.unwrap())
1986            .collect::<Vec<_>>();
1987        assert_eq!(result, all_blocks);
1988
1989        // Last commit index from DagState should now be 15
1990        assert_eq!(dag_state.last_commit_index(), LAST_COMMIT_INDEX);
1991
1992        // Destroy the dag state without flushing additional data.
1993        drop(dag_state);
1994
1995        // Recover the state from the store
1996        let dag_state = DagState::new(context.clone(), store.clone());
1997
1998        // Persisted blocks rounds should be found in DagState.
1999        let all_blocks = dag_builder.blocks(1..=PERSISTED_BLOCK_ROUNDS);
2000        let block_refs = all_blocks
2001            .iter()
2002            .map(|block| block.reference())
2003            .collect::<Vec<_>>();
2004        let result = dag_state
2005            .get_blocks(&block_refs)
2006            .into_iter()
2007            .map(|b| b.unwrap())
2008            .collect::<Vec<_>>();
2009        assert_eq!(result, all_blocks);
2010
2011        // Unpersisted blocks should not be in DagState, because they are not flushed.
2012        let missing_blocks = dag_builder.blocks(PERSISTED_BLOCK_ROUNDS + 1..=NUM_ROUNDS);
2013        let block_refs = missing_blocks
2014            .iter()
2015            .map(|block| block.reference())
2016            .collect::<Vec<_>>();
2017        let retrieved_blocks = dag_state
2018            .get_blocks(&block_refs)
2019            .into_iter()
2020            .flatten()
2021            .collect::<Vec<_>>();
2022        assert!(retrieved_blocks.is_empty());
2023
2024        // Recovered last commit index and round should be 8 and 9.
2025        assert_eq!(dag_state.last_commit_index(), LAST_PERSISTED_COMMIT_INDEX);
2026        assert_eq!(dag_state.last_commit_round(), LAST_PERSISTED_COMMIT_ROUND);
2027
2028        // The last_commit_rounds of the finalized commits should have been recovered.
2029        let expected_last_committed_rounds = vec![5, 9, 8, 8];
2030        assert_eq!(
2031            dag_state.last_committed_rounds(),
2032            expected_last_committed_rounds
2033        );
2034        // Unscored subdags will be recovered based on the flushed commits and no commit info.
2035        assert_eq!(dag_state.scoring_subdags_count(), NUM_PERSISTED_COMMITS);
2036
2037        // Ensure that cached blocks exist only for specific rounds per authority
2038        for (authority_index, _) in context.committee.authorities() {
2039            let blocks = dag_state.get_cached_blocks(authority_index, 1);
2040
2041            // Ensure that eviction rounds have been properly recovered.
2042            // For every authority, the gc round is 9 - 3 = 6, and cached round is 12-5 = 7.
2043            // So eviction round is the min which is 6.
2044            if authority_index == AuthorityIndex::new_for_test(0) {
2045                assert_eq!(blocks.len(), 4);
2046                assert_eq!(dag_state.evicted_rounds[authority_index.value()], 6);
2047                assert!(
2048                    blocks
2049                        .into_iter()
2050                        .all(|block| block.round() >= 7 && block.round() <= 12)
2051                );
2052            } else {
2053                assert_eq!(blocks.len(), 6);
2054                assert_eq!(dag_state.evicted_rounds[authority_index.value()], 6);
2055                assert!(
2056                    blocks
2057                        .into_iter()
2058                        .all(|block| block.round() >= 7 && block.round() <= 12)
2059                );
2060            }
2061        }
2062
2063        // Ensure that committed blocks from > gc_round have been correctly recovered as committed according to committed sub dags.
2064        let gc_round = dag_state.gc_round();
2065        assert_eq!(gc_round, 6);
2066        dag_state
2067            .recent_blocks
2068            .iter()
2069            .for_each(|(block_ref, block_info)| {
2070                if block_ref.round > gc_round && finalized_blocks.contains(block_ref) {
2071                    assert!(
2072                        block_info.committed,
2073                        "Block {:?} should be set as committed",
2074                        block_ref
2075                    );
2076                }
2077            });
2078
2079        // Ensure the hard linked status of blocks are recovered.
2080        // All blocks below highest accepted round, or authority 0 round 12 block, should be hard linked.
2081        // Other blocks (round 12 but not from authority 0) should not be hard linked.
2082        // This is because authority 0 blocks are considered proposed blocks.
2083        dag_state
2084            .recent_blocks
2085            .iter()
2086            .for_each(|(block_ref, block_info)| {
2087                if block_ref.round < PERSISTED_BLOCK_ROUNDS || block_ref.author.value() == 0 {
2088                    assert!(block_info.hard_linked);
2089                } else {
2090                    assert!(!block_info.hard_linked);
2091                }
2092            });
2093    }
2094
2095    #[tokio::test]
2096    async fn test_block_info_as_committed() {
2097        let num_authorities: u32 = 4;
2098        let (context, _) = Context::new_for_test(num_authorities as usize);
2099        let context = Arc::new(context);
2100
2101        let store = Arc::new(MemStore::new());
2102        let mut dag_state = DagState::new(context.clone(), store.clone());
2103
2104        // Accept a block
2105        let block = VerifiedBlock::new_for_test(
2106            TestBlock::new(1, 0)
2107                .set_timestamp_ms(1000)
2108                .set_ancestors(vec![])
2109                .build(),
2110        );
2111
2112        dag_state.accept_block(block.clone());
2113
2114        // Query is committed
2115        assert!(!dag_state.is_committed(&block.reference()));
2116
2117        // Set block as committed for first time should return true
2118        assert!(
2119            dag_state.set_committed(&block.reference()),
2120            "Block should be successfully set as committed for first time"
2121        );
2122
2123        // Now it should appear as committed
2124        assert!(dag_state.is_committed(&block.reference()));
2125
2126        // Trying to set the block as committed again, it should return false.
2127        assert!(
2128            !dag_state.set_committed(&block.reference()),
2129            "Block should not be successfully set as committed"
2130        );
2131    }
2132
2133    #[tokio::test]
2134    async fn test_get_cached_blocks() {
2135        let (mut context, _) = Context::new_for_test(4);
2136        context.parameters.dag_state_cached_rounds = 5;
2137
2138        let context = Arc::new(context);
2139        let store = Arc::new(MemStore::new());
2140        let mut dag_state = DagState::new(context.clone(), store.clone());
2141
2142        // Create no blocks for authority 0
2143        // Create one block (round 10) for authority 1
2144        // Create two blocks (rounds 10,11) for authority 2
2145        // Create three blocks (rounds 10,11,12) for authority 3
2146        let mut all_blocks = Vec::new();
2147        for author in 1..=3 {
2148            for round in 10..(10 + author) {
2149                let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
2150                all_blocks.push(block.clone());
2151                dag_state.accept_block(block);
2152            }
2153        }
2154
2155        // Test get_cached_blocks()
2156
2157        let cached_blocks =
2158            dag_state.get_cached_blocks(context.committee.to_authority_index(0).unwrap(), 0);
2159        assert!(cached_blocks.is_empty());
2160
2161        let cached_blocks =
2162            dag_state.get_cached_blocks(context.committee.to_authority_index(1).unwrap(), 10);
2163        assert_eq!(cached_blocks.len(), 1);
2164        assert_eq!(cached_blocks[0].round(), 10);
2165
2166        let cached_blocks =
2167            dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 10);
2168        assert_eq!(cached_blocks.len(), 2);
2169        assert_eq!(cached_blocks[0].round(), 10);
2170        assert_eq!(cached_blocks[1].round(), 11);
2171
2172        let cached_blocks =
2173            dag_state.get_cached_blocks(context.committee.to_authority_index(2).unwrap(), 11);
2174        assert_eq!(cached_blocks.len(), 1);
2175        assert_eq!(cached_blocks[0].round(), 11);
2176
2177        let cached_blocks =
2178            dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 10);
2179        assert_eq!(cached_blocks.len(), 3);
2180        assert_eq!(cached_blocks[0].round(), 10);
2181        assert_eq!(cached_blocks[1].round(), 11);
2182        assert_eq!(cached_blocks[2].round(), 12);
2183
2184        let cached_blocks =
2185            dag_state.get_cached_blocks(context.committee.to_authority_index(3).unwrap(), 12);
2186        assert_eq!(cached_blocks.len(), 1);
2187        assert_eq!(cached_blocks[0].round(), 12);
2188
2189        // Test get_cached_blocks_in_range()
2190
2191        // Start == end
2192        let cached_blocks = dag_state.get_cached_blocks_in_range(
2193            context.committee.to_authority_index(3).unwrap(),
2194            10,
2195            10,
2196            1,
2197        );
2198        assert!(cached_blocks.is_empty());
2199
2200        // Start > end
2201        let cached_blocks = dag_state.get_cached_blocks_in_range(
2202            context.committee.to_authority_index(3).unwrap(),
2203            11,
2204            10,
2205            1,
2206        );
2207        assert!(cached_blocks.is_empty());
2208
2209        // Empty result.
2210        let cached_blocks = dag_state.get_cached_blocks_in_range(
2211            context.committee.to_authority_index(0).unwrap(),
2212            9,
2213            10,
2214            1,
2215        );
2216        assert!(cached_blocks.is_empty());
2217
2218        // Single block, one round before the end.
2219        let cached_blocks = dag_state.get_cached_blocks_in_range(
2220            context.committee.to_authority_index(1).unwrap(),
2221            9,
2222            11,
2223            1,
2224        );
2225        assert_eq!(cached_blocks.len(), 1);
2226        assert_eq!(cached_blocks[0].round(), 10);
2227
2228        // Respect end round.
2229        let cached_blocks = dag_state.get_cached_blocks_in_range(
2230            context.committee.to_authority_index(2).unwrap(),
2231            9,
2232            12,
2233            5,
2234        );
2235        assert_eq!(cached_blocks.len(), 2);
2236        assert_eq!(cached_blocks[0].round(), 10);
2237        assert_eq!(cached_blocks[1].round(), 11);
2238
2239        // Respect start round.
2240        let cached_blocks = dag_state.get_cached_blocks_in_range(
2241            context.committee.to_authority_index(3).unwrap(),
2242            11,
2243            20,
2244            5,
2245        );
2246        assert_eq!(cached_blocks.len(), 2);
2247        assert_eq!(cached_blocks[0].round(), 11);
2248        assert_eq!(cached_blocks[1].round(), 12);
2249
2250        // Respect limit
2251        let cached_blocks = dag_state.get_cached_blocks_in_range(
2252            context.committee.to_authority_index(3).unwrap(),
2253            10,
2254            20,
2255            1,
2256        );
2257        assert_eq!(cached_blocks.len(), 1);
2258        assert_eq!(cached_blocks[0].round(), 10);
2259    }
2260
2261    #[tokio::test]
2262    async fn test_get_last_cached_block() {
2263        // GIVEN
2264        const CACHED_ROUNDS: Round = 2;
2265        const GC_DEPTH: u32 = 1;
2266        let (mut context, _) = Context::new_for_test(4);
2267        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2268        context
2269            .protocol_config
2270            .set_consensus_gc_depth_for_testing(GC_DEPTH);
2271
2272        let context = Arc::new(context);
2273        let store = Arc::new(MemStore::new());
2274        let mut dag_state = DagState::new(context.clone(), store.clone());
2275
2276        // Create no blocks for authority 0
2277        // Create one block (round 1) for authority 1
2278        // Create two blocks (rounds 1,2) for authority 2
2279        // Create three blocks (rounds 1,2,3) for authority 3
2280        let dag_str = "DAG {
2281            Round 0 : { 4 },
2282            Round 1 : {
2283                B -> [*],
2284                C -> [*],
2285                D -> [*],
2286            },
2287            Round 2 : {
2288                C -> [*],
2289                D -> [*],
2290            },
2291            Round 3 : {
2292                D -> [*],
2293            },
2294        }";
2295
2296        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2297
2298        // Add equivocating block for round 2 authority 3
2299        let block = VerifiedBlock::new_for_test(TestBlock::new(2, 2).build());
2300
2301        // Accept all blocks
2302        for block in dag_builder
2303            .all_blocks()
2304            .into_iter()
2305            .chain(std::iter::once(block))
2306        {
2307            dag_state.accept_block(block);
2308        }
2309
2310        dag_state.add_commit(TrustedCommit::new_for_test(
2311            1 as CommitIndex,
2312            CommitDigest::MIN,
2313            context.clock.timestamp_utc_ms(),
2314            dag_builder.leader_block(3).unwrap().reference(),
2315            vec![],
2316        ));
2317
2318        // WHEN search for the latest blocks
2319        let end_round = 4;
2320        let expected_rounds = vec![0, 1, 2, 3];
2321        let expected_excluded_and_equivocating_blocks = vec![0, 0, 1, 0];
2322        // THEN
2323        let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2324        assert_eq!(
2325            last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2326            expected_rounds
2327        );
2328        assert_eq!(
2329            last_blocks.iter().map(|b| b.1.len()).collect::<Vec<_>>(),
2330            expected_excluded_and_equivocating_blocks
2331        );
2332
2333        // THEN
2334        for (i, expected_round) in expected_rounds.iter().enumerate() {
2335            let round = dag_state
2336                .get_last_cached_block_in_range(
2337                    context.committee.to_authority_index(i).unwrap(),
2338                    0,
2339                    end_round,
2340                )
2341                .map(|b| b.round())
2342                .unwrap_or_default();
2343            assert_eq!(round, *expected_round, "Authority {i}");
2344        }
2345
2346        // WHEN starting from round 2
2347        let start_round = 2;
2348        let expected_rounds = [0, 0, 2, 3];
2349
2350        // THEN
2351        for (i, expected_round) in expected_rounds.iter().enumerate() {
2352            let round = dag_state
2353                .get_last_cached_block_in_range(
2354                    context.committee.to_authority_index(i).unwrap(),
2355                    start_round,
2356                    end_round,
2357                )
2358                .map(|b| b.round())
2359                .unwrap_or_default();
2360            assert_eq!(round, *expected_round, "Authority {i}");
2361        }
2362
2363        // WHEN we flush the DagState - after adding a commit with all the blocks, we expect this to trigger
2364        // a clean up in the internal cache. That will keep the all the blocks with rounds >= authority_commit_round - CACHED_ROUND.
2365        //
2366        // When GC is enabled then we'll keep all the blocks that are > gc_round (2) and for those who don't have blocks > gc_round, we'll keep
2367        // all their highest round blocks for CACHED_ROUNDS.
2368        dag_state.flush();
2369
2370        // AND we request before round 3
2371        let end_round = 3;
2372        let expected_rounds = vec![0, 1, 2, 2];
2373
2374        // THEN
2375        let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
2376        assert_eq!(
2377            last_blocks.iter().map(|b| b.0.round()).collect::<Vec<_>>(),
2378            expected_rounds
2379        );
2380
2381        // THEN
2382        for (i, expected_round) in expected_rounds.iter().enumerate() {
2383            let round = dag_state
2384                .get_last_cached_block_in_range(
2385                    context.committee.to_authority_index(i).unwrap(),
2386                    0,
2387                    end_round,
2388                )
2389                .map(|b| b.round())
2390                .unwrap_or_default();
2391            assert_eq!(round, *expected_round, "Authority {i}");
2392        }
2393    }
2394
2395    #[tokio::test]
2396    #[should_panic(
2397        expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority [2]"
2398    )]
2399    async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() {
2400        // GIVEN
2401        const CACHED_ROUNDS: Round = 1;
2402        const GC_DEPTH: u32 = 1;
2403        let (mut context, _) = Context::new_for_test(4);
2404        context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;
2405        context
2406            .protocol_config
2407            .set_consensus_gc_depth_for_testing(GC_DEPTH);
2408
2409        let context = Arc::new(context);
2410        let store = Arc::new(MemStore::new());
2411        let mut dag_state = DagState::new(context.clone(), store.clone());
2412
2413        // Create no blocks for authority 0
2414        // Create one block (round 1) for authority 1
2415        // Create two blocks (rounds 1,2) for authority 2
2416        // Create three blocks (rounds 1,2,3) for authority 3
2417        let mut dag_builder = DagBuilder::new(context.clone());
2418        dag_builder
2419            .layers(1..=1)
2420            .authorities(vec![AuthorityIndex::new_for_test(0)])
2421            .skip_block()
2422            .build();
2423        dag_builder
2424            .layers(2..=2)
2425            .authorities(vec![
2426                AuthorityIndex::new_for_test(0),
2427                AuthorityIndex::new_for_test(1),
2428            ])
2429            .skip_block()
2430            .build();
2431        dag_builder
2432            .layers(3..=3)
2433            .authorities(vec![
2434                AuthorityIndex::new_for_test(0),
2435                AuthorityIndex::new_for_test(1),
2436                AuthorityIndex::new_for_test(2),
2437            ])
2438            .skip_block()
2439            .build();
2440
2441        // Accept all blocks
2442        for block in dag_builder.all_blocks() {
2443            dag_state.accept_block(block);
2444        }
2445
2446        dag_state.add_commit(TrustedCommit::new_for_test(
2447            1 as CommitIndex,
2448            CommitDigest::MIN,
2449            0,
2450            dag_builder.leader_block(3).unwrap().reference(),
2451            vec![],
2452        ));
2453
2454        // Flush the store so we update the evict rounds
2455        dag_state.flush();
2456
2457        // THEN the method should panic, as some authorities have already evicted rounds <= round 2
2458        dag_state.get_last_cached_block_per_authority(2);
2459    }
2460
2461    #[tokio::test]
2462    async fn test_last_quorum() {
2463        // GIVEN
2464        let (context, _) = Context::new_for_test(4);
2465        let context = Arc::new(context);
2466        let store = Arc::new(MemStore::new());
2467        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2468
2469        // WHEN no blocks exist then genesis should be returned
2470        {
2471            let genesis = genesis_blocks(context.as_ref());
2472
2473            assert_eq!(dag_state.read().last_quorum(), genesis);
2474        }
2475
2476        // WHEN a fully connected DAG up to round 4 is created, then round 4 blocks should be returned as quorum
2477        {
2478            let mut dag_builder = DagBuilder::new(context.clone());
2479            dag_builder
2480                .layers(1..=4)
2481                .build()
2482                .persist_layers(dag_state.clone());
2483            let round_4_blocks: Vec<_> = dag_builder
2484                .blocks(4..=4)
2485                .into_iter()
2486                .map(|block| block.reference())
2487                .collect();
2488
2489            let last_quorum = dag_state.read().last_quorum();
2490
2491            assert_eq!(
2492                last_quorum
2493                    .into_iter()
2494                    .map(|block| block.reference())
2495                    .collect::<Vec<_>>(),
2496                round_4_blocks
2497            );
2498        }
2499
2500        // WHEN adding one more block at round 5, still round 4 should be returned as quorum
2501        {
2502            let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2503            dag_state.write().accept_block(block);
2504
2505            let round_4_blocks = dag_state.read().get_uncommitted_blocks_at_round(4);
2506
2507            let last_quorum = dag_state.read().last_quorum();
2508
2509            assert_eq!(last_quorum, round_4_blocks);
2510        }
2511    }
2512
2513    #[tokio::test]
2514    async fn test_last_block_for_authority() {
2515        // GIVEN
2516        let (context, _) = Context::new_for_test(4);
2517        let context = Arc::new(context);
2518        let store = Arc::new(MemStore::new());
2519        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2520
2521        // WHEN no blocks exist then genesis should be returned
2522        {
2523            let genesis = genesis_blocks(context.as_ref());
2524            let my_genesis = genesis
2525                .into_iter()
2526                .find(|block| block.author() == context.own_index)
2527                .unwrap();
2528
2529            assert_eq!(dag_state.read().get_last_proposed_block(), my_genesis);
2530        }
2531
2532        // WHEN adding some blocks for authorities, only the last ones should be returned
2533        {
2534            // add blocks up to round 4
2535            let mut dag_builder = DagBuilder::new(context.clone());
2536            dag_builder
2537                .layers(1..=4)
2538                .build()
2539                .persist_layers(dag_state.clone());
2540
2541            // add block 5 for authority 0
2542            let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
2543            dag_state.write().accept_block(block);
2544
2545            let block = dag_state
2546                .read()
2547                .get_last_block_for_authority(AuthorityIndex::new_for_test(0));
2548            assert_eq!(block.round(), 5);
2549
2550            for (authority_index, _) in context.committee.authorities() {
2551                let block = dag_state
2552                    .read()
2553                    .get_last_block_for_authority(authority_index);
2554
2555                if authority_index.value() == 0 {
2556                    assert_eq!(block.round(), 5);
2557                } else {
2558                    assert_eq!(block.round(), 4);
2559                }
2560            }
2561        }
2562    }
2563
2564    #[tokio::test]
2565    async fn test_accept_block_not_panics_when_timestamp_is_ahead_and_median_timestamp() {
2566        // GIVEN
2567        let (context, _) = Context::new_for_test(4);
2568        let context = Arc::new(context);
2569        let store = Arc::new(MemStore::new());
2570        let mut dag_state = DagState::new(context.clone(), store.clone());
2571
2572        // Set a timestamp for the block that is ahead of the current time
2573        let block_timestamp = context.clock.timestamp_utc_ms() + 5_000;
2574
2575        let block = VerifiedBlock::new_for_test(
2576            TestBlock::new(10, 0)
2577                .set_timestamp_ms(block_timestamp)
2578                .build(),
2579        );
2580
2581        // Try to accept the block - it should not panic
2582        dag_state.accept_block(block);
2583    }
2584
2585    #[tokio::test]
2586    async fn test_last_finalized_commit() {
2587        // GIVEN
2588        let (context, _) = Context::new_for_test(4);
2589        let context = Arc::new(context);
2590        let store = Arc::new(MemStore::new());
2591        let mut dag_state = DagState::new(context.clone(), store.clone());
2592
2593        // WHEN adding a finalized commit
2594        let commit_ref = CommitRef::new(1, CommitDigest::MIN);
2595        let rejected_transactions = BTreeMap::new();
2596        dag_state.add_finalized_commit(commit_ref, rejected_transactions.clone());
2597
2598        // THEN the commit should be added to the buffer
2599        assert_eq!(dag_state.finalized_commits_to_write.len(), 1);
2600        assert_eq!(
2601            dag_state.finalized_commits_to_write[0],
2602            (commit_ref, rejected_transactions.clone())
2603        );
2604
2605        // WHEN flushing the DAG state
2606        dag_state.flush();
2607
2608        // THEN the commit and rejected transactions should be written to storage
2609        let last_finalized_commit = store.read_last_finalized_commit().unwrap();
2610        assert_eq!(last_finalized_commit, Some(commit_ref));
2611        let stored_rejected_transactions = store
2612            .read_rejected_transactions(commit_ref)
2613            .unwrap()
2614            .unwrap();
2615        assert_eq!(stored_rejected_transactions, rejected_transactions);
2616    }
2617}