1use std::{
5 collections::{BTreeMap, BTreeSet, VecDeque},
6 ops::Bound::{Excluded, Included},
7};
8
9use consensus_config::AuthorityIndex;
10use consensus_types::block::{BlockDigest, BlockRef, Round, TransactionIndex};
11use mysten_common::ZipDebugEqIteratorExt;
12use parking_lot::RwLock;
13
14use super::{Store, WriteBatch};
15use crate::{
16 block::{BlockAPI as _, VerifiedBlock},
17 commit::{
18 CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRange, CommitRef,
19 TrustedCommit,
20 },
21 error::ConsensusResult,
22};
23
24pub struct MemStore {
26 inner: RwLock<Inner>,
27}
28
29struct Inner {
30 blocks: BTreeMap<(Round, AuthorityIndex, BlockDigest), VerifiedBlock>,
31 digests_by_authorities: BTreeSet<(AuthorityIndex, Round, BlockDigest)>,
32 commits: BTreeMap<(CommitIndex, CommitDigest), TrustedCommit>,
33 commit_votes: BTreeSet<(CommitIndex, CommitDigest, BlockRef)>,
34 commit_info: BTreeMap<(CommitIndex, CommitDigest), CommitInfo>,
35 finalized_commits:
36 BTreeMap<(CommitIndex, CommitDigest), BTreeMap<BlockRef, Vec<TransactionIndex>>>,
37}
38
39impl MemStore {
40 pub fn new() -> Self {
41 MemStore {
42 inner: RwLock::new(Inner {
43 blocks: BTreeMap::new(),
44 digests_by_authorities: BTreeSet::new(),
45 commits: BTreeMap::new(),
46 commit_votes: BTreeSet::new(),
47 commit_info: BTreeMap::new(),
48 finalized_commits: BTreeMap::new(),
49 }),
50 }
51 }
52}
53
54impl Default for MemStore {
55 fn default() -> Self {
56 Self::new()
57 }
58}
59
60impl Store for MemStore {
61 fn write(&self, write_batch: WriteBatch) -> ConsensusResult<()> {
62 let mut inner = self.inner.write();
63
64 for block in write_batch.blocks {
65 let block_ref = block.reference();
66 inner.blocks.insert(
67 (block_ref.round, block_ref.author, block_ref.digest),
68 block.clone(),
69 );
70 inner.digests_by_authorities.insert((
71 block_ref.author,
72 block_ref.round,
73 block_ref.digest,
74 ));
75 for vote in block.commit_votes() {
76 inner
77 .commit_votes
78 .insert((vote.index, vote.digest, block_ref));
79 }
80 }
81
82 for commit in write_batch.commits {
83 inner
84 .commits
85 .insert((commit.index(), commit.digest()), commit);
86 }
87
88 for (commit_ref, commit_info) in write_batch.commit_info {
89 inner
90 .commit_info
91 .insert((commit_ref.index, commit_ref.digest), commit_info);
92 }
93
94 for (commit_ref, rejected_transactions) in write_batch.finalized_commits {
95 inner
96 .finalized_commits
97 .insert((commit_ref.index, commit_ref.digest), rejected_transactions);
98 }
99
100 Ok(())
101 }
102
103 fn read_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<Option<VerifiedBlock>>> {
104 let inner = self.inner.read();
105 let blocks = refs
106 .iter()
107 .map(|r| inner.blocks.get(&(r.round, r.author, r.digest)).cloned())
108 .collect();
109 Ok(blocks)
110 }
111
112 fn contains_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<bool>> {
113 let inner = self.inner.read();
114 let exist = refs
115 .iter()
116 .map(|r| inner.blocks.contains_key(&(r.round, r.author, r.digest)))
117 .collect();
118 Ok(exist)
119 }
120
121 fn scan_blocks_by_author(
122 &self,
123 author: AuthorityIndex,
124 start_round: Round,
125 ) -> ConsensusResult<Vec<VerifiedBlock>> {
126 self.scan_blocks_by_author_in_range(author, start_round, Round::MAX, usize::MAX)
127 }
128
129 fn scan_blocks_by_author_in_range(
130 &self,
131 author: AuthorityIndex,
132 start_round: Round,
133 end_round: Round,
134 limit: usize,
135 ) -> ConsensusResult<Vec<VerifiedBlock>> {
136 let inner = self.inner.read();
137 let mut refs = vec![];
138 for &(author, round, digest) in inner.digests_by_authorities.range((
139 Included((author, start_round, BlockDigest::MIN)),
140 Excluded((author, end_round, BlockDigest::MIN)),
141 )) {
142 refs.push(BlockRef::new(round, author, digest));
143 if refs.len() >= limit {
144 break;
145 }
146 }
147 let results = self.read_blocks(refs.as_slice())?;
148 let mut blocks = vec![];
149 for (r, block) in refs.into_iter().zip_debug_eq(results.into_iter()) {
150 if let Some(block) = block {
151 blocks.push(block);
152 } else {
153 panic!("Block {:?} not found!", r);
154 }
155 }
156 Ok(blocks)
157 }
158
159 fn scan_last_blocks_by_author(
160 &self,
161 author: AuthorityIndex,
162 num_of_rounds: u64,
163 before_round: Option<Round>,
164 ) -> ConsensusResult<Vec<VerifiedBlock>> {
165 let before_round = before_round.unwrap_or(Round::MAX);
166 let mut refs = VecDeque::new();
167 for &(author, round, digest) in self
168 .inner
169 .read()
170 .digests_by_authorities
171 .range((
172 Included((author, Round::MIN, BlockDigest::MIN)),
173 Included((author, before_round, BlockDigest::MAX)),
174 ))
175 .rev()
176 .take(num_of_rounds as usize)
177 {
178 refs.push_front(BlockRef::new(round, author, digest));
179 }
180 let refs_slice = refs.make_contiguous();
181 let results = self.read_blocks(refs_slice)?;
182 let mut blocks = vec![];
183 for (r, block) in refs.into_iter().zip_debug_eq(results.into_iter()) {
184 blocks.push(
185 block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)),
186 );
187 }
188 Ok(blocks)
189 }
190
191 fn read_last_commit(&self) -> ConsensusResult<Option<TrustedCommit>> {
192 let inner = self.inner.read();
193 Ok(inner
194 .commits
195 .last_key_value()
196 .map(|(_, commit)| commit.clone()))
197 }
198
199 fn scan_commits(&self, range: CommitRange) -> ConsensusResult<Vec<TrustedCommit>> {
200 let inner = self.inner.read();
201 let mut commits = vec![];
202 for (_, commit) in inner.commits.range((
203 Included((range.start(), CommitDigest::MIN)),
204 Included((range.end(), CommitDigest::MAX)),
205 )) {
206 commits.push(commit.clone());
207 }
208 Ok(commits)
209 }
210
211 fn read_commit_votes(&self, commit_index: CommitIndex) -> ConsensusResult<Vec<BlockRef>> {
212 let inner = self.inner.read();
213 let votes = inner
214 .commit_votes
215 .range((
216 Included((commit_index, CommitDigest::MIN, BlockRef::MIN)),
217 Included((commit_index, CommitDigest::MAX, BlockRef::MAX)),
218 ))
219 .map(|(_, _, block_ref)| *block_ref)
220 .collect();
221 Ok(votes)
222 }
223
224 fn read_last_commit_info(&self) -> ConsensusResult<Option<(CommitRef, CommitInfo)>> {
225 let inner = self.inner.read();
226 Ok(inner
227 .commit_info
228 .last_key_value()
229 .map(|(k, v)| (CommitRef::new(k.0, k.1), v.clone())))
230 }
231
232 fn read_last_finalized_commit(&self) -> ConsensusResult<Option<CommitRef>> {
233 let inner = self.inner.read();
234 Ok(inner
235 .finalized_commits
236 .last_key_value()
237 .map(|(k, _)| CommitRef::new(k.0, k.1)))
238 }
239
240 fn read_rejected_transactions(
241 &self,
242 commit_ref: CommitRef,
243 ) -> ConsensusResult<Option<BTreeMap<BlockRef, Vec<TransactionIndex>>>> {
244 let inner = self.inner.read();
245 Ok(inner
246 .finalized_commits
247 .get(&(commit_ref.index, commit_ref.digest))
248 .cloned())
249 }
250}