1use std::{
5 collections::{BTreeMap, VecDeque},
6 ops::Bound::Included,
7 time::Duration,
8};
9
10use bytes::Bytes;
11use consensus_config::AuthorityIndex;
12use consensus_types::block::{BlockDigest, BlockRef, Round, TransactionIndex};
13use sui_macros::fail_point;
14use typed_store::{
15 DBMapUtils, Map as _,
16 metrics::SamplingInterval,
17 rocks::{DBMap, DBMapTableConfigMap, MetricConf, default_db_options},
18};
19
20use super::{CommitInfo, Store, WriteBatch};
21use crate::{
22 block::{BlockAPI as _, SignedBlock, VerifiedBlock},
23 commit::{CommitAPI as _, CommitDigest, CommitIndex, CommitRange, CommitRef, TrustedCommit},
24 error::{ConsensusError, ConsensusResult},
25};
26
27#[derive(DBMapUtils)]
29#[cfg_attr(tidehunter, tidehunter)]
30pub struct RocksDBStore {
31 blocks: DBMap<(Round, AuthorityIndex, BlockDigest), Bytes>,
33 #[rename = "digests"]
35 digests_by_authorities: DBMap<(AuthorityIndex, Round, BlockDigest), ()>,
36 commits: DBMap<(CommitIndex, CommitDigest), Bytes>,
38 commit_votes: DBMap<(CommitIndex, CommitDigest, BlockRef), ()>,
41 commit_info: DBMap<(CommitIndex, CommitDigest), CommitInfo>,
43 finalized_commits:
45 DBMap<(CommitIndex, CommitDigest), BTreeMap<BlockRef, Vec<TransactionIndex>>>,
46}
47
48impl RocksDBStore {
49 const BLOCKS_CF: &'static str = "blocks";
50 const DIGESTS_BY_AUTHORITIES_CF: &'static str = "digests";
51 const COMMITS_CF: &'static str = "commits";
52 const COMMIT_VOTES_CF: &'static str = "commit_votes";
53 const COMMIT_INFO_CF: &'static str = "commit_info";
54 const FINALIZED_COMMITS_CF: &'static str = "finalized_commits";
55
56 #[cfg(not(tidehunter))]
58 pub fn new(path: &str) -> Self {
59 let db_options = default_db_options().optimize_db_for_write_throughput(2);
62 let mut metrics_conf = MetricConf::new("consensus");
63 metrics_conf.read_sample_interval = SamplingInterval::new(Duration::from_secs(60), 0);
64 let cf_options = default_db_options().optimize_for_write_throughput();
65 let column_family_options = DBMapTableConfigMap::new(BTreeMap::from([
66 (
67 Self::BLOCKS_CF.to_string(),
68 default_db_options()
69 .optimize_for_write_throughput_no_deletion()
70 .set_block_options(512, 128 << 10),
72 ),
73 (
74 Self::DIGESTS_BY_AUTHORITIES_CF.to_string(),
75 cf_options.clone(),
76 ),
77 (Self::COMMITS_CF.to_string(), cf_options.clone()),
78 (Self::COMMIT_VOTES_CF.to_string(), cf_options.clone()),
79 (Self::COMMIT_INFO_CF.to_string(), cf_options.clone()),
80 (Self::FINALIZED_COMMITS_CF.to_string(), cf_options.clone()),
81 ]));
82 Self::open_tables_read_write(
83 path.into(),
84 metrics_conf,
85 Some(db_options.options),
86 Some(column_family_options),
87 )
88 }
89
90 #[cfg(tidehunter)]
91 pub fn new(path: &str) -> Self {
92 tracing::warn!("Consensus store using tidehunter");
93 use typed_store::tidehunter_util::{
94 KeyIndexing, KeySpaceConfig, KeyType, ThConfig, default_mutex_count,
95 };
96 let mutexes = default_mutex_count();
97 let index_digest_key = KeyIndexing::key_reduction(36, 0..12);
98 let index_index_digest_key = KeyIndexing::key_reduction(40, 0..24);
99 let commit_vote_key = KeyIndexing::key_reduction(76, 0..60);
100 let u32_prefix = KeyType::from_prefix_bits(3 * 8);
101 let u64_prefix = KeyType::from_prefix_bits(6 * 8);
102 let configs = vec![
103 (
104 Self::BLOCKS_CF.to_string(),
105 ThConfig::new_with_config_indexing(
106 index_index_digest_key.clone(),
107 mutexes,
108 u32_prefix.clone(),
109 KeySpaceConfig::new(),
110 ),
111 ),
112 (
113 Self::DIGESTS_BY_AUTHORITIES_CF.to_string(),
114 ThConfig::new_with_config_indexing(
115 index_index_digest_key.clone(),
116 mutexes,
117 u64_prefix.clone(),
118 KeySpaceConfig::new(),
119 ),
120 ),
121 (
122 Self::COMMITS_CF.to_string(),
123 ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
124 ),
125 (
126 Self::COMMIT_VOTES_CF.to_string(),
127 ThConfig::new_with_config_indexing(
128 commit_vote_key,
129 mutexes,
130 u32_prefix.clone(),
131 KeySpaceConfig::new(),
132 ),
133 ),
134 (
135 Self::COMMIT_INFO_CF.to_string(),
136 ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
137 ),
138 (
139 Self::FINALIZED_COMMITS_CF.to_string(),
140 ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
141 ),
142 ];
143 Self::open_tables_read_write(
144 path.into(),
145 MetricConf::new("consensus")
146 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
147 configs.into_iter().collect(),
148 )
149 }
150}
151
152impl Store for RocksDBStore {
153 fn write(&self, write_batch: WriteBatch) -> ConsensusResult<()> {
154 fail_point!("consensus-store-before-write");
155
156 let mut batch = self.blocks.batch();
157 for block in write_batch.blocks {
158 let block_ref = block.reference();
159 batch
160 .insert_batch(
161 &self.blocks,
162 [(
163 (block_ref.round, block_ref.author, block_ref.digest),
164 block.serialized(),
165 )],
166 )
167 .map_err(ConsensusError::RocksDBFailure)?;
168 batch
169 .insert_batch(
170 &self.digests_by_authorities,
171 [((block_ref.author, block_ref.round, block_ref.digest), ())],
172 )
173 .map_err(ConsensusError::RocksDBFailure)?;
174 for vote in block.commit_votes() {
175 batch
176 .insert_batch(
177 &self.commit_votes,
178 [((vote.index, vote.digest, block_ref), ())],
179 )
180 .map_err(ConsensusError::RocksDBFailure)?;
181 }
182 }
183
184 for commit in write_batch.commits {
185 batch
186 .insert_batch(
187 &self.commits,
188 [((commit.index(), commit.digest()), commit.serialized())],
189 )
190 .map_err(ConsensusError::RocksDBFailure)?;
191 }
192
193 for (commit_ref, commit_info) in write_batch.commit_info {
194 batch
195 .insert_batch(
196 &self.commit_info,
197 [((commit_ref.index, commit_ref.digest), commit_info)],
198 )
199 .map_err(ConsensusError::RocksDBFailure)?;
200 }
201
202 for (commit_ref, rejected_transactions) in write_batch.finalized_commits {
203 batch
204 .insert_batch(
205 &self.finalized_commits,
206 [((commit_ref.index, commit_ref.digest), rejected_transactions)],
207 )
208 .map_err(ConsensusError::RocksDBFailure)?;
209 }
210
211 batch.write()?;
212 fail_point!("consensus-store-after-write");
213 Ok(())
214 }
215
216 fn read_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<Option<VerifiedBlock>>> {
217 let keys = refs
218 .iter()
219 .map(|r| (r.round, r.author, r.digest))
220 .collect::<Vec<_>>();
221 let serialized = self.blocks.multi_get(keys)?;
222 let mut blocks = vec![];
223 for (key, serialized) in refs.iter().zip(serialized) {
224 if let Some(serialized) = serialized {
225 let signed_block: SignedBlock =
226 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
227 let block = VerifiedBlock::new_verified(signed_block, serialized);
229 assert_eq!(*key, block.reference());
231 blocks.push(Some(block));
232 } else {
233 blocks.push(None);
234 }
235 }
236 Ok(blocks)
237 }
238
239 fn contains_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<bool>> {
240 let refs = refs
241 .iter()
242 .map(|r| (r.round, r.author, r.digest))
243 .collect::<Vec<_>>();
244 let exist = self.blocks.multi_contains_keys(refs)?;
245 Ok(exist)
246 }
247
248 fn scan_blocks_by_author(
249 &self,
250 author: AuthorityIndex,
251 start_round: Round,
252 ) -> ConsensusResult<Vec<VerifiedBlock>> {
253 let mut refs = vec![];
254 for kv in self.digests_by_authorities.safe_range_iter((
255 Included((author, start_round, BlockDigest::MIN)),
256 Included((author, Round::MAX, BlockDigest::MAX)),
257 )) {
258 let ((author, round, digest), _) = kv?;
259 refs.push(BlockRef::new(round, author, digest));
260 }
261 let results = self.read_blocks(refs.as_slice())?;
262 let mut blocks = Vec::with_capacity(refs.len());
263 for (r, block) in refs.into_iter().zip(results.into_iter()) {
264 blocks.push(
265 block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)),
266 );
267 }
268 Ok(blocks)
269 }
270
271 fn scan_last_blocks_by_author(
275 &self,
276 author: AuthorityIndex,
277 num_of_rounds: u64,
278 before_round: Option<Round>,
279 ) -> ConsensusResult<Vec<VerifiedBlock>> {
280 let before_round = before_round.unwrap_or(Round::MAX);
281 let mut refs = VecDeque::new();
282 for kv in self
283 .digests_by_authorities
284 .reversed_safe_iter_with_bounds(
285 Some((author, Round::MIN, BlockDigest::MIN)),
286 Some((author, before_round, BlockDigest::MAX)),
287 )?
288 .take(num_of_rounds as usize)
289 {
290 let ((author, round, digest), _) = kv?;
291 refs.push_front(BlockRef::new(round, author, digest));
292 }
293 let refs_slice = refs.make_contiguous();
294 let results = self.read_blocks(refs_slice)?;
295 let mut blocks = vec![];
296 for (r, block) in refs.into_iter().zip(results.into_iter()) {
297 blocks.push(
298 block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)),
299 );
300 }
301 Ok(blocks)
302 }
303
304 fn read_last_commit(&self) -> ConsensusResult<Option<TrustedCommit>> {
305 let Some(result) = self
306 .commits
307 .reversed_safe_iter_with_bounds(None, None)?
308 .next()
309 else {
310 return Ok(None);
311 };
312 let ((_index, digest), serialized) = result?;
313 let commit = TrustedCommit::new_trusted(
314 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedCommit)?,
315 serialized,
316 );
317 assert_eq!(commit.digest(), digest);
318 Ok(Some(commit))
319 }
320
321 fn scan_commits(&self, range: CommitRange) -> ConsensusResult<Vec<TrustedCommit>> {
322 let mut commits = vec![];
323 for result in self.commits.safe_range_iter((
324 Included((range.start(), CommitDigest::MIN)),
325 Included((range.end(), CommitDigest::MAX)),
326 )) {
327 let ((_index, digest), serialized) = result?;
328 let commit = TrustedCommit::new_trusted(
329 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedCommit)?,
330 serialized,
331 );
332 assert_eq!(commit.digest(), digest);
333 commits.push(commit);
334 }
335 Ok(commits)
336 }
337
338 fn read_commit_votes(&self, commit_index: CommitIndex) -> ConsensusResult<Vec<BlockRef>> {
339 let mut votes = Vec::new();
340 for vote in self.commit_votes.safe_range_iter((
341 Included((commit_index, CommitDigest::MIN, BlockRef::MIN)),
342 Included((commit_index, CommitDigest::MAX, BlockRef::MAX)),
343 )) {
344 let ((_, _, block_ref), _) = vote?;
345 votes.push(block_ref);
346 }
347 Ok(votes)
348 }
349
350 fn read_last_commit_info(&self) -> ConsensusResult<Option<(CommitRef, CommitInfo)>> {
351 let Some(result) = self
352 .commit_info
353 .reversed_safe_iter_with_bounds(None, None)?
354 .next()
355 else {
356 return Ok(None);
357 };
358 let (key, commit_info) = result.map_err(ConsensusError::RocksDBFailure)?;
359 Ok(Some((CommitRef::new(key.0, key.1), commit_info)))
360 }
361
362 fn read_last_finalized_commit(&self) -> ConsensusResult<Option<CommitRef>> {
363 let Some(result) = self
364 .finalized_commits
365 .reversed_safe_iter_with_bounds(None, None)?
366 .next()
367 else {
368 return Ok(None);
369 };
370 let ((index, digest), _) = result.map_err(ConsensusError::RocksDBFailure)?;
371 Ok(Some(CommitRef::new(index, digest)))
372 }
373
374 fn read_rejected_transactions(
375 &self,
376 commit_ref: CommitRef,
377 ) -> ConsensusResult<Option<BTreeMap<BlockRef, Vec<TransactionIndex>>>> {
378 let result = self
379 .finalized_commits
380 .get(&(commit_ref.index, commit_ref.digest))?;
381 Ok(result)
382 }
383}