1use std::{
5 collections::{BTreeMap, VecDeque},
6 ops::Bound::{Excluded, Included},
7 time::Duration,
8};
9
10use bytes::Bytes;
11use consensus_config::AuthorityIndex;
12use consensus_types::block::{BlockDigest, BlockRef, Round, TransactionIndex};
13use mysten_common::ZipDebugEqIteratorExt;
14use sui_macros::fail_point;
15use typed_store::{
16 DBMapUtils, Map as _,
17 metrics::SamplingInterval,
18 rocks::{DBMap, DBMapTableConfigMap, MetricConf, default_db_options},
19};
20
21use super::{CommitInfo, Store, WriteBatch};
22use crate::{
23 block::{BlockAPI as _, SignedBlock, VerifiedBlock},
24 commit::{CommitAPI as _, CommitDigest, CommitIndex, CommitRange, CommitRef, TrustedCommit},
25 error::{ConsensusError, ConsensusResult},
26};
27
28#[derive(DBMapUtils)]
30#[cfg_attr(tidehunter, tidehunter)]
31pub struct RocksDBStore {
32 blocks: DBMap<(Round, AuthorityIndex, BlockDigest), Bytes>,
34 #[rename = "digests"]
36 digests_by_authorities: DBMap<(AuthorityIndex, Round, BlockDigest), ()>,
37 commits: DBMap<(CommitIndex, CommitDigest), Bytes>,
39 commit_votes: DBMap<(CommitIndex, CommitDigest, BlockRef), ()>,
42 commit_info: DBMap<(CommitIndex, CommitDigest), CommitInfo>,
44 finalized_commits:
46 DBMap<(CommitIndex, CommitDigest), BTreeMap<BlockRef, Vec<TransactionIndex>>>,
47}
48
49impl RocksDBStore {
50 const BLOCKS_CF: &'static str = "blocks";
51 const DIGESTS_BY_AUTHORITIES_CF: &'static str = "digests";
52 const COMMITS_CF: &'static str = "commits";
53 const COMMIT_VOTES_CF: &'static str = "commit_votes";
54 const COMMIT_INFO_CF: &'static str = "commit_info";
55 const FINALIZED_COMMITS_CF: &'static str = "finalized_commits";
56
57 #[cfg(not(tidehunter))]
59 pub fn new(path: &str, use_fifo_compaction: bool) -> Self {
60 let db_options =
63 default_db_options().optimize_db_for_write_throughput(2, use_fifo_compaction);
64 let mut metrics_conf = MetricConf::new("consensus");
65 metrics_conf.read_sample_interval = SamplingInterval::new(Duration::from_secs(60), 0);
66 let cf_options = if use_fifo_compaction {
67 default_db_options().optimize_for_no_deletion()
68 } else {
69 default_db_options().optimize_for_write_throughput()
70 };
71 let blocks_cf_options = if use_fifo_compaction {
75 default_db_options().optimize_for_no_deletion()
76 } else {
77 default_db_options().optimize_for_write_throughput_no_deletion()
78 };
79 let column_family_options = DBMapTableConfigMap::new(BTreeMap::from([
80 (
81 Self::BLOCKS_CF.to_string(),
82 blocks_cf_options
83 .set_block_options(512, 128 << 10),
85 ),
86 (
87 Self::DIGESTS_BY_AUTHORITIES_CF.to_string(),
88 cf_options.clone(),
89 ),
90 (Self::COMMITS_CF.to_string(), cf_options.clone()),
91 (Self::COMMIT_VOTES_CF.to_string(), cf_options.clone()),
92 (Self::COMMIT_INFO_CF.to_string(), cf_options.clone()),
93 (Self::FINALIZED_COMMITS_CF.to_string(), cf_options.clone()),
94 ]));
95 Self::open_tables_read_write(
96 path.into(),
97 metrics_conf,
98 Some(db_options.options),
99 Some(column_family_options),
100 )
101 }
102
103 #[cfg(tidehunter)]
104 pub fn new(path: &str, _use_fifo_compaction: bool) -> Self {
105 tracing::warn!("Consensus store using tidehunter");
106 use typed_store::tidehunter_util::{
107 KeyIndexing, KeySpaceConfig, KeyType, ThConfig, default_mutex_count,
108 };
109 let mutexes = default_mutex_count();
110 let index_digest_key = KeyIndexing::key_reduction(36, 0..12);
111 let index_index_digest_key = KeyIndexing::key_reduction(40, 0..24);
112 let commit_vote_key = KeyIndexing::key_reduction(76, 0..60);
113 let u32_prefix = KeyType::from_prefix_bits(3 * 8);
114 let u64_prefix = KeyType::from_prefix_bits(6 * 8);
115 let configs = vec![
116 (
117 Self::BLOCKS_CF.to_string(),
118 ThConfig::new_with_config_indexing(
119 index_index_digest_key.clone(),
120 mutexes,
121 u32_prefix.clone(),
122 KeySpaceConfig::new(),
123 ),
124 ),
125 (
126 Self::DIGESTS_BY_AUTHORITIES_CF.to_string(),
127 ThConfig::new_with_config_indexing(
128 index_index_digest_key.clone(),
129 mutexes,
130 u64_prefix.clone(),
131 KeySpaceConfig::new(),
132 ),
133 ),
134 (
135 Self::COMMITS_CF.to_string(),
136 ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
137 ),
138 (
139 Self::COMMIT_VOTES_CF.to_string(),
140 ThConfig::new_with_config_indexing(
141 commit_vote_key,
142 mutexes,
143 u32_prefix.clone(),
144 KeySpaceConfig::new(),
145 ),
146 ),
147 (
148 Self::COMMIT_INFO_CF.to_string(),
149 ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
150 ),
151 (
152 Self::FINALIZED_COMMITS_CF.to_string(),
153 ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
154 ),
155 ];
156 Self::open_tables_read_write(
157 path.into(),
158 MetricConf::new("consensus")
159 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0))
160 .with_th_batch_compression(),
161 configs.into_iter().collect(),
162 )
163 }
164}
165
166impl Store for RocksDBStore {
167 fn write(&self, write_batch: WriteBatch) -> ConsensusResult<()> {
168 fail_point!("consensus-store-before-write");
169
170 let mut batch = self.blocks.batch();
171 for block in write_batch.blocks {
172 let block_ref = block.reference();
173 batch
174 .insert_batch(
175 &self.blocks,
176 [(
177 (block_ref.round, block_ref.author, block_ref.digest),
178 block.serialized(),
179 )],
180 )
181 .map_err(ConsensusError::RocksDBFailure)?;
182 batch
183 .insert_batch(
184 &self.digests_by_authorities,
185 [((block_ref.author, block_ref.round, block_ref.digest), ())],
186 )
187 .map_err(ConsensusError::RocksDBFailure)?;
188 for vote in block.commit_votes() {
189 batch
190 .insert_batch(
191 &self.commit_votes,
192 [((vote.index, vote.digest, block_ref), ())],
193 )
194 .map_err(ConsensusError::RocksDBFailure)?;
195 }
196 }
197
198 for commit in write_batch.commits {
199 batch
200 .insert_batch(
201 &self.commits,
202 [((commit.index(), commit.digest()), commit.serialized())],
203 )
204 .map_err(ConsensusError::RocksDBFailure)?;
205 }
206
207 for (commit_ref, commit_info) in write_batch.commit_info {
208 batch
209 .insert_batch(
210 &self.commit_info,
211 [((commit_ref.index, commit_ref.digest), commit_info)],
212 )
213 .map_err(ConsensusError::RocksDBFailure)?;
214 }
215
216 for (commit_ref, rejected_transactions) in write_batch.finalized_commits {
217 batch
218 .insert_batch(
219 &self.finalized_commits,
220 [((commit_ref.index, commit_ref.digest), rejected_transactions)],
221 )
222 .map_err(ConsensusError::RocksDBFailure)?;
223 }
224
225 batch.write()?;
226 fail_point!("consensus-store-after-write");
227 Ok(())
228 }
229
230 fn read_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<Option<VerifiedBlock>>> {
231 let keys = refs
232 .iter()
233 .map(|r| (r.round, r.author, r.digest))
234 .collect::<Vec<_>>();
235 let serialized = self.blocks.multi_get(keys)?;
236 let mut blocks = vec![];
237 for (key, serialized) in refs.iter().zip_debug_eq(serialized) {
238 if let Some(serialized) = serialized {
239 let signed_block: SignedBlock =
240 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
241 let block = VerifiedBlock::new_verified(signed_block, serialized);
243 assert_eq!(*key, block.reference());
245 blocks.push(Some(block));
246 } else {
247 blocks.push(None);
248 }
249 }
250 Ok(blocks)
251 }
252
253 fn contains_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<bool>> {
254 let refs = refs
255 .iter()
256 .map(|r| (r.round, r.author, r.digest))
257 .collect::<Vec<_>>();
258 let exist = self.blocks.multi_contains_keys(refs)?;
259 Ok(exist)
260 }
261
262 fn scan_blocks_by_author(
263 &self,
264 author: AuthorityIndex,
265 start_round: Round,
266 ) -> ConsensusResult<Vec<VerifiedBlock>> {
267 self.scan_blocks_by_author_in_range(author, start_round, Round::MAX, usize::MAX)
268 }
269
270 fn scan_blocks_by_author_in_range(
271 &self,
272 author: AuthorityIndex,
273 start_round: Round,
274 end_round: Round,
275 limit: usize,
276 ) -> ConsensusResult<Vec<VerifiedBlock>> {
277 let mut refs = vec![];
278 for kv in self.digests_by_authorities.safe_range_iter((
279 Included((author, start_round, BlockDigest::MIN)),
280 Excluded((author, end_round, BlockDigest::MIN)),
281 )) {
282 let ((author, round, digest), _) = kv?;
283 refs.push(BlockRef::new(round, author, digest));
284 if refs.len() >= limit {
285 break;
286 }
287 }
288 let results = self.read_blocks(refs.as_slice())?;
289 let mut blocks = Vec::with_capacity(refs.len());
290 for (r, block) in refs.into_iter().zip_debug_eq(results.into_iter()) {
291 blocks.push(
292 block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)),
293 );
294 }
295 Ok(blocks)
296 }
297
298 fn scan_last_blocks_by_author(
302 &self,
303 author: AuthorityIndex,
304 num_of_rounds: u64,
305 before_round: Option<Round>,
306 ) -> ConsensusResult<Vec<VerifiedBlock>> {
307 let before_round = before_round.unwrap_or(Round::MAX);
308 let mut refs = VecDeque::new();
309 for kv in self
310 .digests_by_authorities
311 .reversed_safe_iter_with_bounds(
312 Some((author, Round::MIN, BlockDigest::MIN)),
313 Some((author, before_round, BlockDigest::MAX)),
314 )?
315 .take(num_of_rounds as usize)
316 {
317 let ((author, round, digest), _) = kv?;
318 refs.push_front(BlockRef::new(round, author, digest));
319 }
320 let refs_slice = refs.make_contiguous();
321 let results = self.read_blocks(refs_slice)?;
322 let mut blocks = vec![];
323 for (r, block) in refs.into_iter().zip_debug_eq(results.into_iter()) {
324 blocks.push(
325 block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)),
326 );
327 }
328 Ok(blocks)
329 }
330
331 fn read_last_commit(&self) -> ConsensusResult<Option<TrustedCommit>> {
332 let Some(result) = self
333 .commits
334 .reversed_safe_iter_with_bounds(None, None)?
335 .next()
336 else {
337 return Ok(None);
338 };
339 let ((_index, digest), serialized) = result?;
340 let commit = TrustedCommit::new_trusted(
341 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedCommit)?,
342 serialized,
343 );
344 assert_eq!(commit.digest(), digest);
345 Ok(Some(commit))
346 }
347
348 fn scan_commits(&self, range: CommitRange) -> ConsensusResult<Vec<TrustedCommit>> {
349 let mut commits = vec![];
350 for result in self.commits.safe_range_iter((
351 Included((range.start(), CommitDigest::MIN)),
352 Included((range.end(), CommitDigest::MAX)),
353 )) {
354 let ((_index, digest), serialized) = result?;
355 let commit = TrustedCommit::new_trusted(
356 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedCommit)?,
357 serialized,
358 );
359 assert_eq!(commit.digest(), digest);
360 commits.push(commit);
361 }
362 Ok(commits)
363 }
364
365 fn read_commit_votes(&self, commit_index: CommitIndex) -> ConsensusResult<Vec<BlockRef>> {
366 let mut votes = Vec::new();
367 for vote in self.commit_votes.safe_range_iter((
368 Included((commit_index, CommitDigest::MIN, BlockRef::MIN)),
369 Included((commit_index, CommitDigest::MAX, BlockRef::MAX)),
370 )) {
371 let ((_, _, block_ref), _) = vote?;
372 votes.push(block_ref);
373 }
374 Ok(votes)
375 }
376
377 fn read_last_commit_info(&self) -> ConsensusResult<Option<(CommitRef, CommitInfo)>> {
378 let Some(result) = self
379 .commit_info
380 .reversed_safe_iter_with_bounds(None, None)?
381 .next()
382 else {
383 return Ok(None);
384 };
385 let (key, commit_info) = result.map_err(ConsensusError::RocksDBFailure)?;
386 Ok(Some((CommitRef::new(key.0, key.1), commit_info)))
387 }
388
389 fn read_last_finalized_commit(&self) -> ConsensusResult<Option<CommitRef>> {
390 let Some(result) = self
391 .finalized_commits
392 .reversed_safe_iter_with_bounds(None, None)?
393 .next()
394 else {
395 return Ok(None);
396 };
397 let ((index, digest), _) = result.map_err(ConsensusError::RocksDBFailure)?;
398 Ok(Some(CommitRef::new(index, digest)))
399 }
400
401 fn read_rejected_transactions(
402 &self,
403 commit_ref: CommitRef,
404 ) -> ConsensusResult<Option<BTreeMap<BlockRef, Vec<TransactionIndex>>>> {
405 let result = self
406 .finalized_commits
407 .get(&(commit_ref.index, commit_ref.digest))?;
408 Ok(result)
409 }
410}