1use std::{
5 collections::{BTreeMap, BTreeSet, VecDeque},
6 ops::Bound::Included,
7};
8
9use consensus_config::AuthorityIndex;
10use consensus_types::block::{BlockDigest, BlockRef, Round, TransactionIndex};
11use parking_lot::RwLock;
12
13use super::{Store, WriteBatch};
14use crate::{
15 block::{BlockAPI as _, VerifiedBlock},
16 commit::{
17 CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRange, CommitRef,
18 TrustedCommit,
19 },
20 error::ConsensusResult,
21};
22
23pub struct MemStore {
25 inner: RwLock<Inner>,
26}
27
28struct Inner {
29 blocks: BTreeMap<(Round, AuthorityIndex, BlockDigest), VerifiedBlock>,
30 digests_by_authorities: BTreeSet<(AuthorityIndex, Round, BlockDigest)>,
31 commits: BTreeMap<(CommitIndex, CommitDigest), TrustedCommit>,
32 commit_votes: BTreeSet<(CommitIndex, CommitDigest, BlockRef)>,
33 commit_info: BTreeMap<(CommitIndex, CommitDigest), CommitInfo>,
34 finalized_commits:
35 BTreeMap<(CommitIndex, CommitDigest), BTreeMap<BlockRef, Vec<TransactionIndex>>>,
36}
37
38impl MemStore {
39 pub fn new() -> Self {
40 MemStore {
41 inner: RwLock::new(Inner {
42 blocks: BTreeMap::new(),
43 digests_by_authorities: BTreeSet::new(),
44 commits: BTreeMap::new(),
45 commit_votes: BTreeSet::new(),
46 commit_info: BTreeMap::new(),
47 finalized_commits: BTreeMap::new(),
48 }),
49 }
50 }
51}
52
53impl Default for MemStore {
54 fn default() -> Self {
55 Self::new()
56 }
57}
58
59impl Store for MemStore {
60 fn write(&self, write_batch: WriteBatch) -> ConsensusResult<()> {
61 let mut inner = self.inner.write();
62
63 for block in write_batch.blocks {
64 let block_ref = block.reference();
65 inner.blocks.insert(
66 (block_ref.round, block_ref.author, block_ref.digest),
67 block.clone(),
68 );
69 inner.digests_by_authorities.insert((
70 block_ref.author,
71 block_ref.round,
72 block_ref.digest,
73 ));
74 for vote in block.commit_votes() {
75 inner
76 .commit_votes
77 .insert((vote.index, vote.digest, block_ref));
78 }
79 }
80
81 for commit in write_batch.commits {
82 inner
83 .commits
84 .insert((commit.index(), commit.digest()), commit);
85 }
86
87 for (commit_ref, commit_info) in write_batch.commit_info {
88 inner
89 .commit_info
90 .insert((commit_ref.index, commit_ref.digest), commit_info);
91 }
92
93 for (commit_ref, rejected_transactions) in write_batch.finalized_commits {
94 inner
95 .finalized_commits
96 .insert((commit_ref.index, commit_ref.digest), rejected_transactions);
97 }
98
99 Ok(())
100 }
101
102 fn read_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<Option<VerifiedBlock>>> {
103 let inner = self.inner.read();
104 let blocks = refs
105 .iter()
106 .map(|r| inner.blocks.get(&(r.round, r.author, r.digest)).cloned())
107 .collect();
108 Ok(blocks)
109 }
110
111 fn contains_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<bool>> {
112 let inner = self.inner.read();
113 let exist = refs
114 .iter()
115 .map(|r| inner.blocks.contains_key(&(r.round, r.author, r.digest)))
116 .collect();
117 Ok(exist)
118 }
119
120 fn scan_blocks_by_author(
121 &self,
122 author: AuthorityIndex,
123 start_round: Round,
124 ) -> ConsensusResult<Vec<VerifiedBlock>> {
125 let inner = self.inner.read();
126 let mut refs = vec![];
127 for &(author, round, digest) in inner.digests_by_authorities.range((
128 Included((author, start_round, BlockDigest::MIN)),
129 Included((author, Round::MAX, BlockDigest::MAX)),
130 )) {
131 refs.push(BlockRef::new(round, author, digest));
132 }
133 let results = self.read_blocks(refs.as_slice())?;
134 let mut blocks = vec![];
135 for (r, block) in refs.into_iter().zip(results.into_iter()) {
136 if let Some(block) = block {
137 blocks.push(block);
138 } else {
139 panic!("Block {:?} not found!", r);
140 }
141 }
142 Ok(blocks)
143 }
144
145 fn scan_last_blocks_by_author(
146 &self,
147 author: AuthorityIndex,
148 num_of_rounds: u64,
149 before_round: Option<Round>,
150 ) -> ConsensusResult<Vec<VerifiedBlock>> {
151 let before_round = before_round.unwrap_or(Round::MAX);
152 let mut refs = VecDeque::new();
153 for &(author, round, digest) in self
154 .inner
155 .read()
156 .digests_by_authorities
157 .range((
158 Included((author, Round::MIN, BlockDigest::MIN)),
159 Included((author, before_round, BlockDigest::MAX)),
160 ))
161 .rev()
162 .take(num_of_rounds as usize)
163 {
164 refs.push_front(BlockRef::new(round, author, digest));
165 }
166 let refs_slice = refs.make_contiguous();
167 let results = self.read_blocks(refs_slice)?;
168 let mut blocks = vec![];
169 for (r, block) in refs.into_iter().zip(results.into_iter()) {
170 blocks.push(
171 block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)),
172 );
173 }
174 Ok(blocks)
175 }
176
177 fn read_last_commit(&self) -> ConsensusResult<Option<TrustedCommit>> {
178 let inner = self.inner.read();
179 Ok(inner
180 .commits
181 .last_key_value()
182 .map(|(_, commit)| commit.clone()))
183 }
184
185 fn scan_commits(&self, range: CommitRange) -> ConsensusResult<Vec<TrustedCommit>> {
186 let inner = self.inner.read();
187 let mut commits = vec![];
188 for (_, commit) in inner.commits.range((
189 Included((range.start(), CommitDigest::MIN)),
190 Included((range.end(), CommitDigest::MAX)),
191 )) {
192 commits.push(commit.clone());
193 }
194 Ok(commits)
195 }
196
197 fn read_commit_votes(&self, commit_index: CommitIndex) -> ConsensusResult<Vec<BlockRef>> {
198 let inner = self.inner.read();
199 let votes = inner
200 .commit_votes
201 .range((
202 Included((commit_index, CommitDigest::MIN, BlockRef::MIN)),
203 Included((commit_index, CommitDigest::MAX, BlockRef::MAX)),
204 ))
205 .map(|(_, _, block_ref)| *block_ref)
206 .collect();
207 Ok(votes)
208 }
209
210 fn read_last_commit_info(&self) -> ConsensusResult<Option<(CommitRef, CommitInfo)>> {
211 let inner = self.inner.read();
212 Ok(inner
213 .commit_info
214 .last_key_value()
215 .map(|(k, v)| (CommitRef::new(k.0, k.1), v.clone())))
216 }
217
218 fn read_last_finalized_commit(&self) -> ConsensusResult<Option<CommitRef>> {
219 let inner = self.inner.read();
220 Ok(inner
221 .finalized_commits
222 .last_key_value()
223 .map(|(k, _)| CommitRef::new(k.0, k.1)))
224 }
225
226 fn read_rejected_transactions(
227 &self,
228 commit_ref: CommitRef,
229 ) -> ConsensusResult<Option<BTreeMap<BlockRef, Vec<TransactionIndex>>>> {
230 let inner = self.inner.read();
231 Ok(inner
232 .finalized_commits
233 .get(&(commit_ref.index, commit_ref.digest))
234 .cloned())
235 }
236}