1use std::{
5 collections::{BTreeMap, BTreeSet, VecDeque},
6 ops::Bound::Included,
7};
8
9use consensus_config::AuthorityIndex;
10use consensus_types::block::{BlockDigest, BlockRef, Round, TransactionIndex};
11use parking_lot::RwLock;
12
13use super::{Store, WriteBatch};
14use crate::{
15 block::{BlockAPI as _, VerifiedBlock},
16 commit::{
17 CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRange, CommitRef,
18 TrustedCommit,
19 },
20 error::ConsensusResult,
21};
22
23pub struct MemStore {
25 inner: RwLock<Inner>,
26}
27
28struct Inner {
29 blocks: BTreeMap<(Round, AuthorityIndex, BlockDigest), VerifiedBlock>,
30 digests_by_authorities: BTreeSet<(AuthorityIndex, Round, BlockDigest)>,
31 commits: BTreeMap<(CommitIndex, CommitDigest), TrustedCommit>,
32 commit_votes: BTreeSet<(CommitIndex, CommitDigest, BlockRef)>,
33 commit_info: BTreeMap<(CommitIndex, CommitDigest), CommitInfo>,
34 finalized_commits:
35 BTreeMap<(CommitIndex, CommitDigest), BTreeMap<BlockRef, Vec<TransactionIndex>>>,
36}
37
38impl MemStore {
39 pub fn new() -> Self {
40 MemStore {
41 inner: RwLock::new(Inner {
42 blocks: BTreeMap::new(),
43 digests_by_authorities: BTreeSet::new(),
44 commits: BTreeMap::new(),
45 commit_votes: BTreeSet::new(),
46 commit_info: BTreeMap::new(),
47 finalized_commits: BTreeMap::new(),
48 }),
49 }
50 }
51}
52
53impl Default for MemStore {
54 fn default() -> Self {
55 Self::new()
56 }
57}
58
59impl Store for MemStore {
60 fn write(&self, write_batch: WriteBatch) -> ConsensusResult<()> {
61 let mut inner = self.inner.write();
62
63 for block in write_batch.blocks {
64 let block_ref = block.reference();
65 inner.blocks.insert(
66 (block_ref.round, block_ref.author, block_ref.digest),
67 block.clone(),
68 );
69 inner.digests_by_authorities.insert((
70 block_ref.author,
71 block_ref.round,
72 block_ref.digest,
73 ));
74 for vote in block.commit_votes() {
75 inner
76 .commit_votes
77 .insert((vote.index, vote.digest, block_ref));
78 }
79 }
80
81 for commit in write_batch.commits {
82 inner
83 .commits
84 .insert((commit.index(), commit.digest()), commit);
85 }
86
87 for (commit_ref, commit_info) in write_batch.commit_info {
88 inner
89 .commit_info
90 .insert((commit_ref.index, commit_ref.digest), commit_info);
91 }
92
93 for (commit_ref, rejected_transactions) in write_batch.finalized_commits {
94 inner
95 .finalized_commits
96 .insert((commit_ref.index, commit_ref.digest), rejected_transactions);
97 }
98
99 Ok(())
100 }
101
102 fn read_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<Option<VerifiedBlock>>> {
103 let inner = self.inner.read();
104 let blocks = refs
105 .iter()
106 .map(|r| inner.blocks.get(&(r.round, r.author, r.digest)).cloned())
107 .collect();
108 Ok(blocks)
109 }
110
111 fn contains_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<bool>> {
112 let inner = self.inner.read();
113 let exist = refs
114 .iter()
115 .map(|r| inner.blocks.contains_key(&(r.round, r.author, r.digest)))
116 .collect();
117 Ok(exist)
118 }
119
120 fn scan_blocks_by_author(
121 &self,
122 author: AuthorityIndex,
123 start_round: Round,
124 ) -> ConsensusResult<Vec<VerifiedBlock>> {
125 let inner = self.inner.read();
126 let mut refs = vec![];
127 for &(author, round, digest) in inner.digests_by_authorities.range((
128 Included((author, start_round, BlockDigest::MIN)),
129 Included((author, Round::MAX, BlockDigest::MAX)),
130 )) {
131 refs.push(BlockRef::new(round, author, digest));
132 }
133 let results = self.read_blocks(refs.as_slice())?;
134 let mut blocks = vec![];
135 for (r, block) in refs.into_iter().zip(results.into_iter()) {
136 if let Some(block) = block {
137 blocks.push(block);
138 } else {
139 panic!("Block {:?} not found!", r);
140 }
141 }
142 Ok(blocks)
143 }
144
145 fn scan_last_blocks_by_author(
146 &self,
147 author: AuthorityIndex,
148 num_of_rounds: u64,
149 before_round: Option<Round>,
150 ) -> ConsensusResult<Vec<VerifiedBlock>> {
151 let before_round = before_round.unwrap_or(Round::MAX);
152 let mut refs = VecDeque::new();
153 for &(author, round, digest) in self
154 .inner
155 .read()
156 .digests_by_authorities
157 .range((
158 Included((author, Round::MIN, BlockDigest::MIN)),
159 Included((author, before_round, BlockDigest::MAX)),
160 ))
161 .rev()
162 .take(num_of_rounds as usize)
163 {
164 refs.push_front(BlockRef::new(round, author, digest));
165 }
166 let results = self.read_blocks(refs.as_slices().0)?;
167 let mut blocks = vec![];
168 for (r, block) in refs.into_iter().zip(results.into_iter()) {
169 blocks.push(
170 block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)),
171 );
172 }
173 Ok(blocks)
174 }
175
176 fn read_last_commit(&self) -> ConsensusResult<Option<TrustedCommit>> {
177 let inner = self.inner.read();
178 Ok(inner
179 .commits
180 .last_key_value()
181 .map(|(_, commit)| commit.clone()))
182 }
183
184 fn scan_commits(&self, range: CommitRange) -> ConsensusResult<Vec<TrustedCommit>> {
185 let inner = self.inner.read();
186 let mut commits = vec![];
187 for (_, commit) in inner.commits.range((
188 Included((range.start(), CommitDigest::MIN)),
189 Included((range.end(), CommitDigest::MAX)),
190 )) {
191 commits.push(commit.clone());
192 }
193 Ok(commits)
194 }
195
196 fn read_commit_votes(&self, commit_index: CommitIndex) -> ConsensusResult<Vec<BlockRef>> {
197 let inner = self.inner.read();
198 let votes = inner
199 .commit_votes
200 .range((
201 Included((commit_index, CommitDigest::MIN, BlockRef::MIN)),
202 Included((commit_index, CommitDigest::MAX, BlockRef::MAX)),
203 ))
204 .map(|(_, _, block_ref)| *block_ref)
205 .collect();
206 Ok(votes)
207 }
208
209 fn read_last_commit_info(&self) -> ConsensusResult<Option<(CommitRef, CommitInfo)>> {
210 let inner = self.inner.read();
211 Ok(inner
212 .commit_info
213 .last_key_value()
214 .map(|(k, v)| (CommitRef::new(k.0, k.1), v.clone())))
215 }
216
217 fn read_last_finalized_commit(&self) -> ConsensusResult<Option<CommitRef>> {
218 let inner = self.inner.read();
219 Ok(inner
220 .finalized_commits
221 .last_key_value()
222 .map(|(k, _)| CommitRef::new(k.0, k.1)))
223 }
224
225 fn read_rejected_transactions(
226 &self,
227 commit_ref: CommitRef,
228 ) -> ConsensusResult<Option<BTreeMap<BlockRef, Vec<TransactionIndex>>>> {
229 let inner = self.inner.read();
230 Ok(inner
231 .finalized_commits
232 .get(&(commit_ref.index, commit_ref.digest))
233 .cloned())
234 }
235}