consensus_core/storage/
mem_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet, VecDeque},
6    ops::Bound::{Excluded, Included},
7};
8
9use consensus_config::AuthorityIndex;
10use consensus_types::block::{BlockDigest, BlockRef, Round, TransactionIndex};
11use mysten_common::ZipDebugEqIteratorExt;
12use parking_lot::RwLock;
13
14use super::{Store, WriteBatch};
15use crate::{
16    block::{BlockAPI as _, VerifiedBlock},
17    commit::{
18        CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRange, CommitRef,
19        TrustedCommit,
20    },
21    error::ConsensusResult,
22};
23
24/// In-memory storage for testing.
25pub struct MemStore {
26    inner: RwLock<Inner>,
27}
28
29struct Inner {
30    blocks: BTreeMap<(Round, AuthorityIndex, BlockDigest), VerifiedBlock>,
31    digests_by_authorities: BTreeSet<(AuthorityIndex, Round, BlockDigest)>,
32    commits: BTreeMap<(CommitIndex, CommitDigest), TrustedCommit>,
33    commit_votes: BTreeSet<(CommitIndex, CommitDigest, BlockRef)>,
34    commit_info: BTreeMap<(CommitIndex, CommitDigest), CommitInfo>,
35    finalized_commits:
36        BTreeMap<(CommitIndex, CommitDigest), BTreeMap<BlockRef, Vec<TransactionIndex>>>,
37}
38
39impl MemStore {
40    pub fn new() -> Self {
41        MemStore {
42            inner: RwLock::new(Inner {
43                blocks: BTreeMap::new(),
44                digests_by_authorities: BTreeSet::new(),
45                commits: BTreeMap::new(),
46                commit_votes: BTreeSet::new(),
47                commit_info: BTreeMap::new(),
48                finalized_commits: BTreeMap::new(),
49            }),
50        }
51    }
52}
53
54impl Default for MemStore {
55    fn default() -> Self {
56        Self::new()
57    }
58}
59
60impl Store for MemStore {
61    fn write(&self, write_batch: WriteBatch) -> ConsensusResult<()> {
62        let mut inner = self.inner.write();
63
64        for block in write_batch.blocks {
65            let block_ref = block.reference();
66            inner.blocks.insert(
67                (block_ref.round, block_ref.author, block_ref.digest),
68                block.clone(),
69            );
70            inner.digests_by_authorities.insert((
71                block_ref.author,
72                block_ref.round,
73                block_ref.digest,
74            ));
75            for vote in block.commit_votes() {
76                inner
77                    .commit_votes
78                    .insert((vote.index, vote.digest, block_ref));
79            }
80        }
81
82        for commit in write_batch.commits {
83            inner
84                .commits
85                .insert((commit.index(), commit.digest()), commit);
86        }
87
88        for (commit_ref, commit_info) in write_batch.commit_info {
89            inner
90                .commit_info
91                .insert((commit_ref.index, commit_ref.digest), commit_info);
92        }
93
94        for (commit_ref, rejected_transactions) in write_batch.finalized_commits {
95            inner
96                .finalized_commits
97                .insert((commit_ref.index, commit_ref.digest), rejected_transactions);
98        }
99
100        Ok(())
101    }
102
103    fn read_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<Option<VerifiedBlock>>> {
104        let inner = self.inner.read();
105        let blocks = refs
106            .iter()
107            .map(|r| inner.blocks.get(&(r.round, r.author, r.digest)).cloned())
108            .collect();
109        Ok(blocks)
110    }
111
112    fn contains_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<bool>> {
113        let inner = self.inner.read();
114        let exist = refs
115            .iter()
116            .map(|r| inner.blocks.contains_key(&(r.round, r.author, r.digest)))
117            .collect();
118        Ok(exist)
119    }
120
121    fn scan_blocks_by_author(
122        &self,
123        author: AuthorityIndex,
124        start_round: Round,
125    ) -> ConsensusResult<Vec<VerifiedBlock>> {
126        self.scan_blocks_by_author_in_range(author, start_round, Round::MAX, usize::MAX)
127    }
128
129    fn scan_blocks_by_author_in_range(
130        &self,
131        author: AuthorityIndex,
132        start_round: Round,
133        end_round: Round,
134        limit: usize,
135    ) -> ConsensusResult<Vec<VerifiedBlock>> {
136        let inner = self.inner.read();
137        let mut refs = vec![];
138        for &(author, round, digest) in inner.digests_by_authorities.range((
139            Included((author, start_round, BlockDigest::MIN)),
140            Excluded((author, end_round, BlockDigest::MIN)),
141        )) {
142            refs.push(BlockRef::new(round, author, digest));
143            if refs.len() >= limit {
144                break;
145            }
146        }
147        let results = self.read_blocks(refs.as_slice())?;
148        let mut blocks = vec![];
149        for (r, block) in refs.into_iter().zip_debug_eq(results.into_iter()) {
150            if let Some(block) = block {
151                blocks.push(block);
152            } else {
153                panic!("Block {:?} not found!", r);
154            }
155        }
156        Ok(blocks)
157    }
158
159    fn scan_last_blocks_by_author(
160        &self,
161        author: AuthorityIndex,
162        num_of_rounds: u64,
163        before_round: Option<Round>,
164    ) -> ConsensusResult<Vec<VerifiedBlock>> {
165        let before_round = before_round.unwrap_or(Round::MAX);
166        let mut refs = VecDeque::new();
167        for &(author, round, digest) in self
168            .inner
169            .read()
170            .digests_by_authorities
171            .range((
172                Included((author, Round::MIN, BlockDigest::MIN)),
173                Included((author, before_round, BlockDigest::MAX)),
174            ))
175            .rev()
176            .take(num_of_rounds as usize)
177        {
178            refs.push_front(BlockRef::new(round, author, digest));
179        }
180        let refs_slice = refs.make_contiguous();
181        let results = self.read_blocks(refs_slice)?;
182        let mut blocks = vec![];
183        for (r, block) in refs.into_iter().zip_debug_eq(results.into_iter()) {
184            blocks.push(
185                block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)),
186            );
187        }
188        Ok(blocks)
189    }
190
191    fn read_last_commit(&self) -> ConsensusResult<Option<TrustedCommit>> {
192        let inner = self.inner.read();
193        Ok(inner
194            .commits
195            .last_key_value()
196            .map(|(_, commit)| commit.clone()))
197    }
198
199    fn scan_commits(&self, range: CommitRange) -> ConsensusResult<Vec<TrustedCommit>> {
200        let inner = self.inner.read();
201        let mut commits = vec![];
202        for (_, commit) in inner.commits.range((
203            Included((range.start(), CommitDigest::MIN)),
204            Included((range.end(), CommitDigest::MAX)),
205        )) {
206            commits.push(commit.clone());
207        }
208        Ok(commits)
209    }
210
211    fn read_commit_votes(&self, commit_index: CommitIndex) -> ConsensusResult<Vec<BlockRef>> {
212        let inner = self.inner.read();
213        let votes = inner
214            .commit_votes
215            .range((
216                Included((commit_index, CommitDigest::MIN, BlockRef::MIN)),
217                Included((commit_index, CommitDigest::MAX, BlockRef::MAX)),
218            ))
219            .map(|(_, _, block_ref)| *block_ref)
220            .collect();
221        Ok(votes)
222    }
223
224    fn read_last_commit_info(&self) -> ConsensusResult<Option<(CommitRef, CommitInfo)>> {
225        let inner = self.inner.read();
226        Ok(inner
227            .commit_info
228            .last_key_value()
229            .map(|(k, v)| (CommitRef::new(k.0, k.1), v.clone())))
230    }
231
232    fn read_last_finalized_commit(&self) -> ConsensusResult<Option<CommitRef>> {
233        let inner = self.inner.read();
234        Ok(inner
235            .finalized_commits
236            .last_key_value()
237            .map(|(k, _)| CommitRef::new(k.0, k.1)))
238    }
239
240    fn read_rejected_transactions(
241        &self,
242        commit_ref: CommitRef,
243    ) -> ConsensusResult<Option<BTreeMap<BlockRef, Vec<TransactionIndex>>>> {
244        let inner = self.inner.read();
245        Ok(inner
246            .finalized_commits
247            .get(&(commit_ref.index, commit_ref.digest))
248            .cloned())
249    }
250}