consensus_core/storage/
mem_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet, VecDeque},
6    ops::Bound::Included,
7};
8
9use consensus_config::AuthorityIndex;
10use consensus_types::block::{BlockDigest, BlockRef, Round, TransactionIndex};
11use parking_lot::RwLock;
12
13use super::{Store, WriteBatch};
14use crate::{
15    block::{BlockAPI as _, VerifiedBlock},
16    commit::{
17        CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRange, CommitRef,
18        TrustedCommit,
19    },
20    error::ConsensusResult,
21};
22
23/// In-memory storage for testing.
24pub struct MemStore {
25    inner: RwLock<Inner>,
26}
27
28struct Inner {
29    blocks: BTreeMap<(Round, AuthorityIndex, BlockDigest), VerifiedBlock>,
30    digests_by_authorities: BTreeSet<(AuthorityIndex, Round, BlockDigest)>,
31    commits: BTreeMap<(CommitIndex, CommitDigest), TrustedCommit>,
32    commit_votes: BTreeSet<(CommitIndex, CommitDigest, BlockRef)>,
33    commit_info: BTreeMap<(CommitIndex, CommitDigest), CommitInfo>,
34    finalized_commits:
35        BTreeMap<(CommitIndex, CommitDigest), BTreeMap<BlockRef, Vec<TransactionIndex>>>,
36}
37
38impl MemStore {
39    pub fn new() -> Self {
40        MemStore {
41            inner: RwLock::new(Inner {
42                blocks: BTreeMap::new(),
43                digests_by_authorities: BTreeSet::new(),
44                commits: BTreeMap::new(),
45                commit_votes: BTreeSet::new(),
46                commit_info: BTreeMap::new(),
47                finalized_commits: BTreeMap::new(),
48            }),
49        }
50    }
51}
52
53impl Default for MemStore {
54    fn default() -> Self {
55        Self::new()
56    }
57}
58
59impl Store for MemStore {
60    fn write(&self, write_batch: WriteBatch) -> ConsensusResult<()> {
61        let mut inner = self.inner.write();
62
63        for block in write_batch.blocks {
64            let block_ref = block.reference();
65            inner.blocks.insert(
66                (block_ref.round, block_ref.author, block_ref.digest),
67                block.clone(),
68            );
69            inner.digests_by_authorities.insert((
70                block_ref.author,
71                block_ref.round,
72                block_ref.digest,
73            ));
74            for vote in block.commit_votes() {
75                inner
76                    .commit_votes
77                    .insert((vote.index, vote.digest, block_ref));
78            }
79        }
80
81        for commit in write_batch.commits {
82            inner
83                .commits
84                .insert((commit.index(), commit.digest()), commit);
85        }
86
87        for (commit_ref, commit_info) in write_batch.commit_info {
88            inner
89                .commit_info
90                .insert((commit_ref.index, commit_ref.digest), commit_info);
91        }
92
93        for (commit_ref, rejected_transactions) in write_batch.finalized_commits {
94            inner
95                .finalized_commits
96                .insert((commit_ref.index, commit_ref.digest), rejected_transactions);
97        }
98
99        Ok(())
100    }
101
102    fn read_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<Option<VerifiedBlock>>> {
103        let inner = self.inner.read();
104        let blocks = refs
105            .iter()
106            .map(|r| inner.blocks.get(&(r.round, r.author, r.digest)).cloned())
107            .collect();
108        Ok(blocks)
109    }
110
111    fn contains_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<bool>> {
112        let inner = self.inner.read();
113        let exist = refs
114            .iter()
115            .map(|r| inner.blocks.contains_key(&(r.round, r.author, r.digest)))
116            .collect();
117        Ok(exist)
118    }
119
120    fn scan_blocks_by_author(
121        &self,
122        author: AuthorityIndex,
123        start_round: Round,
124    ) -> ConsensusResult<Vec<VerifiedBlock>> {
125        let inner = self.inner.read();
126        let mut refs = vec![];
127        for &(author, round, digest) in inner.digests_by_authorities.range((
128            Included((author, start_round, BlockDigest::MIN)),
129            Included((author, Round::MAX, BlockDigest::MAX)),
130        )) {
131            refs.push(BlockRef::new(round, author, digest));
132        }
133        let results = self.read_blocks(refs.as_slice())?;
134        let mut blocks = vec![];
135        for (r, block) in refs.into_iter().zip(results.into_iter()) {
136            if let Some(block) = block {
137                blocks.push(block);
138            } else {
139                panic!("Block {:?} not found!", r);
140            }
141        }
142        Ok(blocks)
143    }
144
145    fn scan_last_blocks_by_author(
146        &self,
147        author: AuthorityIndex,
148        num_of_rounds: u64,
149        before_round: Option<Round>,
150    ) -> ConsensusResult<Vec<VerifiedBlock>> {
151        let before_round = before_round.unwrap_or(Round::MAX);
152        let mut refs = VecDeque::new();
153        for &(author, round, digest) in self
154            .inner
155            .read()
156            .digests_by_authorities
157            .range((
158                Included((author, Round::MIN, BlockDigest::MIN)),
159                Included((author, before_round, BlockDigest::MAX)),
160            ))
161            .rev()
162            .take(num_of_rounds as usize)
163        {
164            refs.push_front(BlockRef::new(round, author, digest));
165        }
166        let results = self.read_blocks(refs.as_slices().0)?;
167        let mut blocks = vec![];
168        for (r, block) in refs.into_iter().zip(results.into_iter()) {
169            blocks.push(
170                block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)),
171            );
172        }
173        Ok(blocks)
174    }
175
176    fn read_last_commit(&self) -> ConsensusResult<Option<TrustedCommit>> {
177        let inner = self.inner.read();
178        Ok(inner
179            .commits
180            .last_key_value()
181            .map(|(_, commit)| commit.clone()))
182    }
183
184    fn scan_commits(&self, range: CommitRange) -> ConsensusResult<Vec<TrustedCommit>> {
185        let inner = self.inner.read();
186        let mut commits = vec![];
187        for (_, commit) in inner.commits.range((
188            Included((range.start(), CommitDigest::MIN)),
189            Included((range.end(), CommitDigest::MAX)),
190        )) {
191            commits.push(commit.clone());
192        }
193        Ok(commits)
194    }
195
196    fn read_commit_votes(&self, commit_index: CommitIndex) -> ConsensusResult<Vec<BlockRef>> {
197        let inner = self.inner.read();
198        let votes = inner
199            .commit_votes
200            .range((
201                Included((commit_index, CommitDigest::MIN, BlockRef::MIN)),
202                Included((commit_index, CommitDigest::MAX, BlockRef::MAX)),
203            ))
204            .map(|(_, _, block_ref)| *block_ref)
205            .collect();
206        Ok(votes)
207    }
208
209    fn read_last_commit_info(&self) -> ConsensusResult<Option<(CommitRef, CommitInfo)>> {
210        let inner = self.inner.read();
211        Ok(inner
212            .commit_info
213            .last_key_value()
214            .map(|(k, v)| (CommitRef::new(k.0, k.1), v.clone())))
215    }
216
217    fn read_last_finalized_commit(&self) -> ConsensusResult<Option<CommitRef>> {
218        let inner = self.inner.read();
219        Ok(inner
220            .finalized_commits
221            .last_key_value()
222            .map(|(k, _)| CommitRef::new(k.0, k.1)))
223    }
224
225    fn read_rejected_transactions(
226        &self,
227        commit_ref: CommitRef,
228    ) -> ConsensusResult<Option<BTreeMap<BlockRef, Vec<TransactionIndex>>>> {
229        let inner = self.inner.read();
230        Ok(inner
231            .finalized_commits
232            .get(&(commit_ref.index, commit_ref.digest))
233            .cloned())
234    }
235}